You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2016/12/23 01:46:48 UTC

[19/50] [abbrv] incubator-atlas git commit: ATLAS-1121 NPE while submitting topology in StormHook (ayubkhan via sumasai)

ATLAS-1121 NPE while submitting topology in StormHook (ayubkhan via sumasai)

(cherry picked from commit 6fddccd6a28d7cf194926c3d44debeb1b49cd434)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/cf433a53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/cf433a53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/cf433a53

Branch: refs/heads/0.7-incubating
Commit: cf433a53a25dd750992fed6f65944b4e90ed79fe
Parents: 769aaf1
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Tue Aug 16 11:36:45 2016 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Dec 22 15:21:04 2016 -0800

----------------------------------------------------------------------
 .../atlas/storm/hook/StormTopologyUtil.java     | 124 ++++++++++---------
 release-log.txt                                 |   1 +
 2 files changed, 66 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/cf433a53/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
index dc2ae57..edd95ba 100644
--- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
+++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
@@ -36,6 +36,7 @@ import java.util.Set;
  * A storm topology utility class.
  */
 public final class StormTopologyUtil {
+    public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(StormTopologyUtil.class);
 
     private StormTopologyUtil() {
     }
@@ -136,74 +137,79 @@ public final class StormTopologyUtil {
 
         Map<String, String> output = new HashMap<>();
 
-        if (objectsToSkip.add(instance)) {
-            Class clazz = instance.getClass();
-            for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
-                Field[] fields = c.getDeclaredFields();
-                for (Field field : fields) {
-                    if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
-                        continue;
-                    }
-
-                    String key;
-                    if (prependClassName) {
-                        key = String.format("%s.%s", clazz.getSimpleName(), field.getName());
-                    } else {
-                        key = field.getName();
-                    }
+        try {
+            if (objectsToSkip.add(instance)) {
+                Class clazz = instance.getClass();
+                for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
+                    Field[] fields = c.getDeclaredFields();
+                    for (Field field : fields) {
+                        if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
+                            continue;
+                        }
 
-                    boolean accessible = field.isAccessible();
-                    if (!accessible) {
-                        field.setAccessible(true);
-                    }
-                    Object fieldVal = field.get(instance);
-                    if (fieldVal == null) {
-                        continue;
-                    } else if (fieldVal.getClass().isPrimitive() ||
-                            isWrapperType(fieldVal.getClass())) {
-                        if (toString(fieldVal, false).isEmpty()) continue;
-                        output.put(key, toString(fieldVal, false));
-                    } else if (isMapType(fieldVal.getClass())) {
-                        //TODO: check if it makes more sense to just stick to json
-                        // like structure instead of a flatten output.
-                        Map map = (Map) fieldVal;
-                        for (Object entry : map.entrySet()) {
-                            Object mapKey = ((Map.Entry) entry).getKey();
-                            Object mapVal = ((Map.Entry) entry).getValue();
-
-                            String keyStr = getString(mapKey, false, objectsToSkip);
-                            String valStr = getString(mapVal, false, objectsToSkip);
-                            if ((valStr == null) || (valStr.isEmpty())) {
-                                continue;
-                            } else {
-                                output.put(String.format("%s.%s", key, keyStr), valStr);
-                            }
+                        String key;
+                        if (prependClassName) {
+                            key = String.format("%s.%s", clazz.getSimpleName(), field.getName());
+                        } else {
+                            key = field.getName();
                         }
-                    } else if (isCollectionType(fieldVal.getClass())) {
-                        //TODO check if it makes more sense to just stick to
-                        // json like structure instead of a flatten output.
-                        Collection collection = (Collection) fieldVal;
-                        if (collection.size() == 0) continue;
-                        String outStr = "";
-                        for (Object o : collection) {
-                            outStr += getString(o, false, objectsToSkip) + ",";
+
+                        boolean accessible = field.isAccessible();
+                        if (!accessible) {
+                            field.setAccessible(true);
                         }
-                        if (outStr.length() > 0) {
-                            outStr = outStr.substring(0, outStr.length() - 1);
+                        Object fieldVal = field.get(instance);
+                        if (fieldVal == null) {
+                            continue;
+                        } else if (fieldVal.getClass().isPrimitive() ||
+                                isWrapperType(fieldVal.getClass())) {
+                            if (toString(fieldVal, false).isEmpty()) continue;
+                            output.put(key, toString(fieldVal, false));
+                        } else if (isMapType(fieldVal.getClass())) {
+                            //TODO: check if it makes more sense to just stick to json
+                            // like structure instead of a flatten output.
+                            Map map = (Map) fieldVal;
+                            for (Object entry : map.entrySet()) {
+                                Object mapKey = ((Map.Entry) entry).getKey();
+                                Object mapVal = ((Map.Entry) entry).getValue();
+
+                                String keyStr = getString(mapKey, false, objectsToSkip);
+                                String valStr = getString(mapVal, false, objectsToSkip);
+                                if ((valStr == null) || (valStr.isEmpty())) {
+                                    continue;
+                                } else {
+                                    output.put(String.format("%s.%s", key, keyStr), valStr);
+                                }
+                            }
+                        } else if (isCollectionType(fieldVal.getClass())) {
+                            //TODO check if it makes more sense to just stick to
+                            // json like structure instead of a flatten output.
+                            Collection collection = (Collection) fieldVal;
+                            if (collection.size() == 0) continue;
+                            String outStr = "";
+                            for (Object o : collection) {
+                                outStr += getString(o, false, objectsToSkip) + ",";
+                            }
+                            if (outStr.length() > 0) {
+                                outStr = outStr.substring(0, outStr.length() - 1);
+                            }
+                            output.put(key, String.format("%s", outStr));
+                        } else {
+                            Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false, objectsToSkip);
+                            for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) {
+                                output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue());
+                            }
                         }
-                        output.put(key, String.format("%s", outStr));
-                    } else {
-                        Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false, objectsToSkip);
-                        for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) {
-                            output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue());
+                        if (!accessible) {
+                            field.setAccessible(false);
                         }
                     }
-                    if (!accessible) {
-                        field.setAccessible(false);
-                    }
                 }
             }
         }
+        catch (Exception e){
+            LOG.warn("Exception while constructing topology", e);
+        }
         return output;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/cf433a53/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index c55dffd..7ffb5dc 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -28,6 +28,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-1121 NPE while submitting topology in StormHook (ayubkhan via sumasai)
 ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai)
 ATLAS-1108: updated references to atlas.rest.address to handle multiple URLs
 ATLAS-1112 Hive table GET response from atlas server had duplicate column entries ( ayubkhan, mneethiraj via sumasai)