You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by cs...@apache.org on 2015/09/17 06:58:55 UTC

[2/8] incubator-apex-core git commit: APEX-28 #resolve

APEX-28 #resolve

- Added the ability to specify attributes in a properties xml as dt.attr.MY_ATTR
- Added handling for ambiguous attributes
- Added the ability to specify attributes for operators and ports on parent elements like applications
- Cleaned up commented out code and warnings in LogicalPlanConfiguration


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/434a7170
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/434a7170
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/434a7170

Branch: refs/heads/devel-3
Commit: 434a7170e4a4e2da013e283706b5bf42f853f6a4
Parents: d748ed4
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Aug 11 10:50:57 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Sun Aug 30 18:19:40 2015 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/api/Attribute.java     |   11 +-
 .../main/java/com/datatorrent/api/Context.java  |   10 +
 .../stram/plan/logical/LogicalPlan.java         |    8 +-
 .../plan/logical/LogicalPlanConfiguration.java  | 1145 ++++++++++++++----
 .../plan/LogicalPlanConfigurationTest.java      | 1034 +++++++++++++++-
 .../datatorrent/stram/plan/LogicalPlanTest.java |    2 +
 .../stram/plan/logical/MockStorageAgent.java    |   67 +
 engine/src/test/resources/testTopology.json     |    4 +-
 8 files changed, 2008 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/api/src/main/java/com/datatorrent/api/Attribute.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index a7492b5..4c16a2a 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -277,6 +277,13 @@ public class Attribute<T> implements Serializable
         if (map.containsKey(clazz)) {
           return 0;
         }
+
+        map.put(clazz, getAttributesNoSave(clazz));
+        return (long)clazz.getModifiers() << 32 | clazz.hashCode();
+      }
+
+      public static Set<Attribute<Object>> getAttributesNoSave(Class<?> clazz)
+      {
         Set<Attribute<Object>> set = new HashSet<Attribute<Object>>();
         try {
           for (Field f: clazz.getDeclaredFields()) {
@@ -323,8 +330,8 @@ public class Attribute<T> implements Serializable
         catch (Exception ex) {
           DTThrowable.rethrow(ex);
         }
-        map.put(clazz, set);
-        return (long)clazz.getModifiers() << 32 | clazz.hashCode();
+
+        return set;
       }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index cd10398..c2d974a 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -33,6 +33,16 @@ import com.datatorrent.api.annotation.Stateless;
  */
 public interface Context
 {
+  /*
+   * Note: If the same name is given to an Attribute specified in multiple Context classes, then the type of that
+   * Attribute is required to be the same accross all Context classes. This is required because if a simple attribute
+   * name is specified in a properties file at the top level context then that attribute needs to be set in all child configurations. If
+   * there were multiple Attributes specified in different Contexts with the same name, but a different type, then
+   * it would not be possible to set the values of Attributes specified by a simple attribute name in the root
+   * context of a properties file. If this were the case, then adding another Attribute with the same name as a pre-existing Attribute to a new Context
+   * class would be a backwards incompatible change.
+   */
+
   /**
    * Get the attributes associated with this context.
    * The returned map does not contain any attributes that may have been defined in the parent context of this context.

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 6741d37..8826896 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1430,8 +1430,12 @@ public class LogicalPlan implements Serializable, DAG
     }
 
     for (StreamMeta s: streams.values()) {
-      if (s.source == null || (s.sinks.isEmpty())) {
-        throw new ValidationException(String.format("stream not connected: %s", s.getName()));
+      if (s.source == null) {
+        throw new ValidationException(String.format("stream source not connected: %s", s.getName()));
+      }
+
+      if (s.sinks.isEmpty()) {
+        throw new ValidationException(String.format("stream sink not connected: %s", s.getName()));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 46291a8..a3a18c2 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -20,12 +20,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.util.*;
 import java.util.Map.Entry;
 
+import jline.internal.Preconditions;
+
 import javax.validation.ValidationException;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -37,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.beanutils.BeanMap;
 import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 import org.apache.commons.lang3.StringUtils;
@@ -44,6 +50,7 @@ import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.*;
 import com.datatorrent.api.Attribute.AttributeMap.AttributeInitializer;
+import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
@@ -54,6 +61,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.StramElement;
 import com.datatorrent.stram.util.ObjectMapperFactory;
 
 /**
@@ -88,30 +96,51 @@ public class LogicalPlanConfiguration {
   public static final String TEMPLATE_classNameRegExp = "matchClassNameRegExp";
 
   public static final String CLASS = "class";
+  public static final String KEY_SEPARATOR = ".";
+  public static final String KEY_SEPARATOR_SPLIT_REGEX = "\\.";
 
   private static final String CLASS_SUFFIX = "." + CLASS;
 
   private static final String WILDCARD = "*";
   private static final String WILDCARD_PATTERN = ".*";
 
+  /**
+   * This is done to initialize the serial id of these interfaces.
+   */
   static {
     Object serial[] = new Object[] {Context.DAGContext.serialVersionUID, OperatorContext.serialVersionUID, PortContext.serialVersionUID};
     LOG.debug("Initialized attributes {}", serial);
   }
 
-  private enum StramElement {
+  /**
+   * This represents an element that can be referenced in a DT property.
+   */
+  protected enum StramElement {
     APPLICATION("application"), GATEWAY("gateway"), TEMPLATE("template"), OPERATOR("operator"),STREAM("stream"), PORT("port"), INPUT_PORT("inputport"),OUTPUT_PORT("outputport"),
     ATTR("attr"), PROP("prop"),CLASS("class"),PATH("path");
     private final String value;
 
+    /**
+     * Creates a {@link StramElement} with the corresponding name.
+     * @param value The name of the {@link StramElement}.
+     */
     StramElement(String value) {
       this.value = value;
     }
 
+    /**
+     * Gets the name of the {@link StramElement}.
+     * @return The name of the {@link StramElement}.
+     */
     public String getValue() {
       return value;
     }
 
+    /**
+     * Gets the {@link StramElement} corresponding to the given name.
+     * @param value The name for which a {@link StramElement} is desired.
+     * @return The {@link StramElement} corresponding to the given name.
+     */
     public static StramElement fromValue(String value) {
       StramElement velement = null;
       for (StramElement element : StramElement.values()) {
@@ -124,7 +153,754 @@ public class LogicalPlanConfiguration {
     }
 
   }
-  
+
+  /**
+   * This is an enum which represents a type of configuration.
+   */
+  protected enum ConfElement
+  {
+    @SuppressWarnings("SetReplaceableByEnumSet")
+    STRAM(null,
+          null,
+          new HashSet<StramElement>(),
+          null),
+    @SuppressWarnings("SetReplaceableByEnumSet")
+    APPLICATION(StramElement.APPLICATION,
+                STRAM,
+                new HashSet<StramElement>(),
+                DAGContext.class),
+    @SuppressWarnings("SetReplaceableByEnumSet")
+    TEMPLATE(StramElement.TEMPLATE,
+             STRAM,
+             new HashSet<StramElement>(),
+             null),
+    @SuppressWarnings("SetReplaceableByEnumSet")
+    GATEWAY(StramElement.GATEWAY,
+            ConfElement.APPLICATION,
+            new HashSet<StramElement>(),
+            null),
+    @SuppressWarnings("SetReplaceableByEnumSet")
+    OPERATOR(StramElement.OPERATOR,
+             ConfElement.APPLICATION,
+             new HashSet<StramElement>(),
+             OperatorContext.class),
+    @SuppressWarnings("SetReplaceableByEnumSet")
+    STREAM(StramElement.STREAM,
+           ConfElement.APPLICATION,
+           new HashSet<StramElement>(),
+           null),
+    PORT(StramElement.PORT,
+         ConfElement.OPERATOR,
+         Sets.newHashSet(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT),
+         PortContext.class);
+
+    public static final Map<StramElement, ConfElement> STRAM_ELEMENT_TO_CONF_ELEMENT = Maps.newHashMap();
+    public static final Map<Class<? extends Context>, ConfElement> CONTEXT_TO_CONF_ELEMENT = Maps.newHashMap();
+
+    static {
+      initialize();
+    }
+
+    protected static void initialize()
+    {
+      STRAM.setChildren(Sets.newHashSet(APPLICATION, TEMPLATE));
+      APPLICATION.setChildren(Sets.newHashSet(GATEWAY, OPERATOR, STREAM));
+      OPERATOR.setChildren(Sets.newHashSet(PORT));
+
+      STRAM_ELEMENT_TO_CONF_ELEMENT.clear();
+
+      //Initialize StramElement to ConfElement
+      for (ConfElement confElement: ConfElement.values()) {
+        STRAM_ELEMENT_TO_CONF_ELEMENT.put(confElement.getStramElement(), confElement);
+
+        for (StramElement sElement: confElement.getAllRelatedElements()) {
+          STRAM_ELEMENT_TO_CONF_ELEMENT.put(sElement, confElement);
+        }
+      }
+
+      //Initialize attributes
+      for (ConfElement confElement: ConfElement.values()) {
+        if (confElement.getParent() == null) {
+          continue;
+        }
+
+        setAmbiguousAttributes(confElement);
+      }
+
+      // build context to conf element map
+      CONTEXT_TO_CONF_ELEMENT.clear();
+
+      for (ConfElement confElement: ConfElement.values()) {
+        CONTEXT_TO_CONF_ELEMENT.put(confElement.getContextClass(), confElement);
+      }
+
+      //Check if all the context classes are accounted for
+      Set<Class<? extends Context>> confElementContextClasses = Sets.newHashSet();
+
+      for (ConfElement confElement: ConfElement.values()) {
+        if (confElement.getContextClass() == null) {
+          continue;
+        }
+
+        confElementContextClasses.add(confElement.getContextClass());
+      }
+
+      if (!ContextUtils.CONTEXT_CLASSES.equals(confElementContextClasses)) {
+        throw new IllegalStateException("All the context classes "
+                                        + ContextUtils.CONTEXT_CLASSES
+                                        + " found in "
+                                        + Context.class
+                                        + " are not used by ConfElements "
+                                        + confElementContextClasses);
+      }
+    }
+
+    /**
+     * This is a recursive method to initialize the ambiguous elements for each
+     * {@link ConfElement}.
+     *
+     * @param element The current {@link ConfElement} at which to start initializing
+     * the ambiguous elements.
+     * @return The set of all simple attribute names encountered up to this point.
+     */
+    public static Set<String> setAmbiguousAttributes(ConfElement element)
+    {
+      Set<String> ambiguousAttributes = Sets.newHashSet();
+      Set<String> allChildAttributes = Sets.newHashSet(element.getContextAttributes());
+
+      for (ConfElement childElement: element.getChildren()) {
+        Set<String> allAttributes = setAmbiguousAttributes(childElement);
+        ambiguousAttributes.addAll(childElement.getAmbiguousAttributes());
+
+        @SuppressWarnings("unchecked")
+        Set<String> intersection = (Set<String>)Sets.newHashSet(CollectionUtils.intersection(allChildAttributes,
+                                                                                             allAttributes));
+        ambiguousAttributes.addAll(intersection);
+        allChildAttributes.addAll(allAttributes);
+      }
+
+      element.setAmbiguousAttributes(ambiguousAttributes);
+      element.setAllChildAttributes(allChildAttributes);
+
+      return allChildAttributes;
+    }
+
+    private final StramElement element;
+    private final ConfElement parent;
+    private Set<ConfElement> children = Sets.newHashSet();
+    private final Set<StramElement> allRelatedElements = Sets.newHashSet();
+    private final Class<? extends Context> contextClass;
+    private Set<String> ambiguousAttributes = Sets.newHashSet();
+    private Set<String> contextAttributes = Sets.newHashSet();
+    private Set<String> allChildAttributes = Sets.newHashSet();
+
+    /**
+     * This creates a {@link ConfElement}.
+     *
+     * @param element The current {@link StramElement} representing a {@link ConfElement}.
+     * @param parent The parent {@link ConfElement}.
+     * @param additionalRelatedElements Any additional {@link StramElement} that could be
+     * related to this {@link ConfElement}.
+     * @param contextClass The {@link Context} class that contains all the attributes to
+     * be used by this {@link ConfElement}.
+     */
+    ConfElement(StramElement element,
+                ConfElement parent,
+                Set<StramElement> additionalRelatedElements,
+                Class<? extends Context> contextClass)
+    {
+      this.element = element;
+      this.parent = parent;
+
+      this.allRelatedElements.addAll(additionalRelatedElements);
+      this.allRelatedElements.add(element);
+
+      this.contextClass = contextClass;
+
+      if (contextClass != null) {
+        this.contextAttributes = ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass);
+      } else {
+        this.contextAttributes = Sets.newHashSet();
+      }
+    }
+
+    private void setAllChildAttributes(Set<String> allChildAttributes)
+    {
+      this.allChildAttributes = Preconditions.checkNotNull(allChildAttributes);
+    }
+
+    public Set<String> getAllChildAttributes()
+    {
+      return allChildAttributes;
+    }
+
+    private void setAmbiguousAttributes(Set<String> ambiguousAttributes)
+    {
+      this.ambiguousAttributes = Preconditions.checkNotNull(ambiguousAttributes);
+    }
+
+    /**
+     * Gets the simple names of attributes which are specified under multiple configurations which
+     * include this configuration or any child configurations.
+     *
+     * @return The set of ambiguous simple attribute names.
+     */
+    public Set<String> getAmbiguousAttributes()
+    {
+      return ambiguousAttributes;
+    }
+
+    /**
+     * Gets the {@link Context} class that corresponds to this {@link ConfElement}.
+     *
+     * @return The {@link Context} class that corresponds to this {@link ConfElement}.
+     */
+    public Class<? extends Context> getContextClass()
+    {
+      return contextClass;
+    }
+
+    /**
+     * Gets the {@link StramElement} representing this {@link ConfElement}.
+     *
+     * @return The {@link StramElement} corresponding to this {@link ConfElement}.
+     */
+    public StramElement getStramElement()
+    {
+      return element;
+    }
+
+    /**
+     * Gets the attributes contained in the {@link Context} associated with this {@link ConfElement}.
+     *
+     * @return A {@link java.util.Set} containing the simple attribute names of all of the attributes
+     * contained in the {@link Context} associated with this {@link ConfElement}.
+     */
+    public Set<String> getContextAttributes()
+    {
+      return contextAttributes;
+    }
+
+    /**
+     * Gets the {@link ConfElement} that is the parent of this {@link ConfElement}.
+     *
+     * @return The {@link ConfElement} that is the parent of this {@link ConfElement}.
+     */
+    public ConfElement getParent()
+    {
+      return parent;
+    }
+
+    /**
+     * Sets the child {@link ConfElement}s of this {@link ConfElement}.
+     *
+     * @param children The child {@link ConfElement}s of this {@link ConfElement}.
+     */
+    private void setChildren(Set<ConfElement> children)
+    {
+      this.children = Preconditions.checkNotNull(children);
+    }
+
+    /**
+     * Gets the child {@link ConfElement}s of this {@link ConfElement}.
+     *
+     * @return The child {@link ConfElement} of this {@link ConfElement}
+     */
+    public Set<ConfElement> getChildren()
+    {
+      return children;
+    }
+
+    /**
+     * Gets all the {@link StramElement}s that are represented by this {@link ConfElement}.
+     *
+     * @return All the {@link StramElement}s that are represented by this {@link ConfElement}.
+     */
+    public Set<StramElement> getAllRelatedElements()
+    {
+      return allRelatedElements;
+    }
+
+    /**
+     * Gets the {@link StramElement} representing the {@link Conf} type which can be a parent of the {@link Conf} type
+     * represented by the given {@link StramElement}.
+     *
+     * @param conf The {@link StramElement} representing the {@link Conf} type of interest.
+     * @return The {@link StramElement} representing the {@link Conf} type which can be a parent of the given {@link Conf} type.
+     */
+    public static StramElement getAllowedParentConf(StramElement conf)
+    {
+      ConfElement confElement = STRAM_ELEMENT_TO_CONF_ELEMENT.get(conf);
+
+      if (confElement == null) {
+        throw new IllegalArgumentException(conf + " is not a valid conf element.");
+      }
+
+      return confElement.getParent().getStramElement();
+    }
+
+    /**
+     * Creates a list of {@link StramElement}s which represent the path from the current {@link Conf} type to
+     * a root {@link Conf} type. This path includes the current {@link Conf} type as well as the root.
+     *
+     * @param conf The current {@link Conf} type.
+     * @return A path from the current {@link Conf} type to a root {@link Conf} type, which includes the current and root
+     * {
+     * @lin Conf} types.
+     */
+    public static List<StramElement> getPathFromChildToRootInclusive(StramElement conf)
+    {
+      ConfElement confElement = STRAM_ELEMENT_TO_CONF_ELEMENT.get(conf);
+
+      if (confElement == null) {
+        throw new IllegalArgumentException(conf + " does not represent a valid configuration type.");
+      }
+
+      List<StramElement> path = Lists.newArrayList();
+
+      for (; confElement != null; confElement = confElement.getParent()) {
+        path.add(confElement.getStramElement());
+      }
+
+      return path;
+    }
+
+    /**
+     * Creates a list of {@link StramElement}s which represent the path from the root {@link Conf} type to
+     * the current {@link Conf} type. This path includes the root {@link Conf} type as well as the current {@link Conf} type.
+     *
+     * @param conf The current {@link Conf} type.
+     * @return A path from the root {@link Conf} type to the current {@link Conf} type, which includes the current and root
+     * {
+     * @lin Conf} types.
+     */
+    public static List<StramElement> getPathFromRootToChildInclusive(StramElement conf)
+    {
+      List<StramElement> path = getPathFromChildToRootInclusive(conf);
+      return Lists.reverse(path);
+    }
+
+    /**
+     * Creates a list of {@link StramElement}s which represent the path from the current {@link Conf} type to
+     * a parent {@link Conf} type. This path includes the current {@link Conf} type as well as the parent.
+     *
+     * @param child The current {@link Conf} type.
+     * @param parent The parent {@link Conf} type.
+     * @return A path from the current {@link Conf} type to a parent {@link Conf} type, which includes the current and parent
+     * {
+     * @lin Conf} types.
+     */
+    public static List<StramElement> getPathFromChildToParentInclusive(StramElement child,
+                                                                       StramElement parent)
+    {
+      ConfElement confElement = STRAM_ELEMENT_TO_CONF_ELEMENT.get(child);
+
+      if (confElement == null) {
+        throw new IllegalArgumentException(child + " does not represent a valid configuration type.");
+      }
+
+      List<StramElement> path = Lists.newArrayList();
+
+      if (child == parent) {
+        path.add(child);
+        return path;
+      }
+
+      for (; confElement != null; confElement = confElement.getParent()) {
+        path.add(confElement.getStramElement());
+
+        if (confElement.getStramElement() == parent) {
+          break;
+        }
+      }
+
+      if (path.get(path.size() - 1) != parent) {
+        throw new IllegalArgumentException(parent + " is not a valid parent of " + child);
+      }
+
+      return path;
+    }
+
+    /**
+     * Creates a list of {@link StramElement}s which represent the path from the parent {@link Conf} type to
+     * a child {@link Conf} type. This path includes the parent {@link Conf} type as well as the current {@link Conf} type.
+     *
+     * @param child The current {@link Conf} type.
+     * @param parent The parent {@link Conf} type.
+     * @return A path from the parent {@link Conf} type to the current {@link Conf} type, which includes the current and parent
+     * {
+     * @lin Conf} types.
+     */
+    public static List<StramElement> getPathFromParentToChildInclusive(StramElement child,
+                                                                       StramElement parent)
+    {
+      List<StramElement> path = getPathFromChildToParentInclusive(child,
+                                                                  parent);
+      return Lists.reverse(path);
+    }
+
+    /**
+     * This method searches the current {@link ConfElement} and its children to find a {@link ConfElement}
+     * that contains the given simple {@link Attribute} name.
+     *
+     * @param current The current {@link ConfElement}.
+     * @param simpleAttributeName The simple {@link Attribute} name to search for.
+     * @return The {@link ConfElement} that contains the given attribute, or null if no {@link ConfElement} contains
+     * the given attribute.
+     */
+    public static ConfElement findConfElementWithAttribute(ConfElement current,
+                                                           String simpleAttributeName)
+    {
+      if (current.getContextAttributes().contains(simpleAttributeName)) {
+        return current;
+      }
+
+      for (ConfElement childConfElement: current.getChildren()) {
+        ConfElement result = findConfElementWithAttribute(childConfElement,
+                                                          simpleAttributeName);
+
+        if (result != null) {
+          return result;
+        }
+      }
+
+      return null;
+    }
+
+    protected static Conf addConfs(Conf parentConf, ConfElement childConfElement)
+    {
+      //Figure out what configurations need to be added to hold this attribute
+      List<StramElement> path = ConfElement.getPathFromParentToChildInclusive(childConfElement.getStramElement(),
+                                                                              parentConf.getConfElement().getStramElement());
+
+      for (int pathIndex = 1;
+           pathIndex < path.size();
+           pathIndex++) {
+        LOG.debug("Adding conf");
+        StramElement pathElement = path.get(pathIndex);
+        //Add the configurations we need to hold this attribute
+        parentConf = addConf(pathElement, WILDCARD, parentConf);
+      }
+
+      return parentConf;
+    }
+
+  }
+
+  /**
+   * Utility class that holds methods for handling {@link Context} classes.
+   */
+  @SuppressWarnings("unchecked")
+  protected static class ContextUtils
+  {
+    public static final Map<Class<? extends Context>, Set<String>> CONTEXT_CLASS_TO_ATTRIBUTES;
+    public static final Set<Class<? extends Context>> CONTEXT_CLASSES;
+    public static final Map<Class<? extends Context>, Map<String, Attribute<?>>> CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE;
+
+    static {
+      CONTEXT_CLASSES = Sets.newHashSet();
+
+      for (Class<?> clazz: Context.class.getDeclaredClasses()) {
+        if (!Context.class.isAssignableFrom(clazz)) {
+          continue;
+        }
+
+        CONTEXT_CLASSES.add((Class<? extends Context>)clazz);
+      }
+
+      CONTEXT_CLASS_TO_ATTRIBUTES = Maps.newHashMap();
+
+      for (Class<? extends Context> contextClass: CONTEXT_CLASSES) {
+        Set<String> contextAttributes = Sets.newHashSet();
+
+        Field[] fields = contextClass.getDeclaredFields();
+
+        for (Field field: fields) {
+          if (!Attribute.class.isAssignableFrom(field.getType())) {
+            continue;
+          }
+
+          contextAttributes.add(field.getName());
+        }
+
+        CONTEXT_CLASS_TO_ATTRIBUTES.put(contextClass, contextAttributes);
+      }
+
+      CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE = Maps.newHashMap();
+
+      for (Class<? extends Context> contextClass: CONTEXT_CLASSES) {
+        Map<String, Attribute<?>> simpleAttributeNameToAttribute = Maps.newHashMap();
+        CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.put(contextClass, simpleAttributeNameToAttribute);
+
+        Set<Attribute<Object>> attributes = AttributeInitializer.getAttributesNoSave(contextClass);
+
+        LOG.debug("context class {} and attributes {}", contextClass, attributes);
+
+        for (Attribute<Object> attribute: attributes) {
+          simpleAttributeNameToAttribute.put(AttributeParseUtils.getSimpleName(attribute), attribute);
+        }
+      }
+    }
+
+    private ContextUtils()
+    {
+    }
+
+    /**
+     * This method is only used for testing.
+     *
+     * @param contextClass
+     * @param attribute
+     */
+    @VisibleForTesting
+    protected static void addAttribute(Class<? extends Context> contextClass, Attribute<?> attribute)
+    {
+      Set<String> attributeNames = CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass);
+
+      if (attributeNames == null) {
+        attributeNames = Sets.newHashSet();
+        CONTEXT_CLASS_TO_ATTRIBUTES.put(contextClass, attributeNames);
+      }
+
+      attributeNames.add(attribute.getSimpleName());
+
+      CONTEXT_CLASSES.add(contextClass);
+      Map<String, Attribute<?>> attributeMap = CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(contextClass);
+
+      if (attributeMap == null) {
+        attributeMap = Maps.newHashMap();
+        CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.put(contextClass, attributeMap);
+      }
+
+      attributeMap.put(attribute.getSimpleName(), attribute);
+    }
+
+    /**
+     * This method is only used for testing.
+     *
+     * @param contextClass
+     * @param attribute
+     */
+    @VisibleForTesting
+    protected static void removeAttribute(Class<? extends Context> contextClass, Attribute<?> attribute)
+    {
+      Set<String> attributeNames = CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass);
+
+      if (attributeNames != null) {
+        attributeNames.remove(attribute.getSimpleName());
+
+        if (attributeNames.isEmpty()) {
+          CONTEXT_CLASS_TO_ATTRIBUTES.remove(contextClass);
+        }
+      }
+
+      if (!CONTEXT_CLASS_TO_ATTRIBUTES.keySet().contains(contextClass)) {
+        CONTEXT_CLASSES.remove(contextClass);
+      }
+
+      Map<String, Attribute<?>> attributeMap = CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(contextClass);
+
+      if (attributeMap != null) {
+        attributeMap.remove(attribute.getSimpleName());
+
+        if (attributeMap.isEmpty()) {
+          CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.remove(contextClass);
+        }
+      }
+    }
+
+  }
+
+  /**
+   * Utility class that holds methods for parsing.
+   */
+  protected static class AttributeParseUtils
+  {
+    public static final Set<String> ALL_SIMPLE_ATTRIBUTE_NAMES;
+
+    static {
+      ALL_SIMPLE_ATTRIBUTE_NAMES = Sets.newHashSet();
+
+      initialize();
+    }
+
+    public static void initialize()
+    {
+      ALL_SIMPLE_ATTRIBUTE_NAMES.clear();
+
+      for (Map.Entry<Class<? extends Context>, Set<String>> entry: ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.entrySet()) {
+        ALL_SIMPLE_ATTRIBUTE_NAMES.addAll(entry.getValue());
+      }
+    }
+
+    private AttributeParseUtils()
+    {
+    }
+
+    /**
+     * This method creates all the appropriate child {@link Conf}s of the given parent {@link Conf} and adds the given
+     * attribute to the parent {@link Conf} if appropriate as well as all the child {@link Conf}s of the parent if
+     * appropriate.
+     *
+     * @param conf The parent {@link Conf}.
+     * @param attributeName The simple name of the attribute to add.
+     * @param attrValue The value of the attribute.
+     */
+    protected static void processAllConfsForAttribute(Conf conf, String attributeName, String attrValue)
+    {
+      ConfElement confElement = conf.getConfElement();
+
+      LOG.debug("Current confElement {} and name {}", confElement.getStramElement(), conf.getId());
+
+      if (confElement.getContextAttributes().contains(attributeName)) {
+        LOG.debug("Adding attribute");
+        @SuppressWarnings("unchecked")
+        Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(confElement.getContextClass()).get(attributeName);
+        conf.setAttribute(attr, attrValue);
+      }
+
+      for (ConfElement childConfElement: confElement.getChildren()) {
+
+        if (!childConfElement.getAllChildAttributes().contains(attributeName)) {
+          continue;
+        }
+
+        Conf childConf = addConf(childConfElement.getStramElement(), WILDCARD, conf);
+        processAllConfsForAttribute(childConf, attributeName, attrValue);
+      }
+    }
+
+    /**
+     * This extracts the name of an attribute from the given set of keys.
+     *
+     * @param element The {@link StramElement} corresponding to the current element being parsed.
+     * @param keys The split keys that are being parse.
+     * @param index The current key that the parser is on.
+     * @return The FQN name of an attribute or just the name of an Attribute.
+     */
+    public static String getAttributeName(StramElement element, String[] keys, int index)
+    {
+
+      if (element != null && element != StramElement.ATTR) {
+        throw new IllegalArgumentException("The given "
+                                           + StramElement.class
+                                           + " must either have a value of null or "
+                                           + StramElement.ATTR
+                                           + " but it had a value of " + element);
+      }
+
+      String attributeName;
+
+      if (element == StramElement.ATTR) {
+        attributeName = getCompleteKey(keys, index + 1);
+      } else {
+        attributeName = getCompleteKey(keys, index);
+      }
+
+      return attributeName;
+    }
+
+    /**
+     * This method checks to see if the attribute name is simple or is prefixed with the FQCN of the {@link Context}
+     * class which contains it.
+     *
+     * @param attributeName The attribute name to check.
+     * @return True if the attribute name is simple. False otherwise.
+     */
+    public static boolean isSimpleAttributeName(String attributeName)
+    {
+      return !attributeName.contains(KEY_SEPARATOR);
+    }
+
+    /**
+     * Gets the {@link Context} class that the given attributeName belongs to.
+     *
+     * @param attributeName The {@link Attribute} name whose {@link Context} class needs to be
+     * discovered.
+     * @return The {@link Context} class that the given {@link Attribute} name belongs to.
+     */
+    @SuppressWarnings("unchecked")
+    public static Class<? extends Context> getContainingContextClass(String attributeName)
+    {
+      if (isSimpleAttributeName(attributeName)) {
+        throw new IllegalArgumentException("The given attribute name "
+                                           + attributeName
+                                           + " is simple.");
+      }
+
+      LOG.debug("Attribute Name {}", attributeName);
+
+      int lastSeparator = attributeName.lastIndexOf(KEY_SEPARATOR);
+      String contextClassName = attributeName.substring(0, lastSeparator);
+
+      int lastPeriod = contextClassName.lastIndexOf(KEY_SEPARATOR);
+
+      StringBuilder sb = new StringBuilder(contextClassName);
+      sb.setCharAt(lastPeriod, '$');
+      contextClassName = sb.toString();
+
+      Class<? extends Context> contextClass;
+
+      try {
+        Class<?> clazz = Class.forName(contextClassName);
+
+        if (Context.class.isAssignableFrom(clazz)) {
+          contextClass = (Class<? extends Context>)clazz;
+        } else {
+          throw new IllegalArgumentException("The provided context class name "
+                                             + contextClassName
+                                             + " is not valid.");
+        }
+      } catch (ClassNotFoundException ex) {
+        throw new IllegalArgumentException(ex);
+      }
+
+      String simpleAttributeName = getSimpleAttributeName(attributeName);
+
+      if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(simpleAttributeName)) {
+        throw new ValidationException(simpleAttributeName
+                                      + " is not a valid attribute of "
+                                      + contextClass);
+      }
+
+      return contextClass;
+    }
+
+    /**
+     * This extract this simple {@link Attribute} name from the given {@link Attribute} name.
+     *
+     * @param attributeName The attribute name to extract a simple attribute name from.
+     * @return The simple attribute name.
+     */
+    public static String getSimpleAttributeName(String attributeName)
+    {
+      if (isSimpleAttributeName(attributeName)) {
+        return attributeName;
+      }
+
+      if (attributeName.endsWith(KEY_SEPARATOR)) {
+        throw new IllegalArgumentException("The given attribute name ends with \""
+                                           + KEY_SEPARATOR
+                                           + "\" so a simple name cannot be extracted.");
+      }
+
+      return attributeName.substring(attributeName.lastIndexOf(KEY_SEPARATOR) + 1, attributeName.length());
+    }
+
+    /**
+     * Gets the simple name of an {@link Attribute}, which does not include the FQCN of the {@link Context} class
+     * which contains it.
+     *
+     * @param attribute The {@link Attribute} of interest.
+     * @return The name of an {@link Attribute}.
+     */
+    public static String getSimpleName(Attribute<?> attribute)
+    {
+      return getSimpleAttributeName(attribute.name);
+    }
+
+  }
+
   public class JSONObject2String implements StringCodec<Object>, Serializable
   {
     private static final long serialVersionUID = -664977453308585878L;
@@ -132,11 +908,11 @@ public class LogicalPlanConfiguration {
     @Override
     public Object fromString(String jsonObj)
     {
-      
+      LOG.debug("JONString {}", jsonObj);
       ObjectMapper mapper = ObjectMapperFactory.getOperatorValueDeserializer();
       try {
         return mapper.readValue(jsonObj, Object.class);
-      } catch (Exception e) {
+      } catch (IOException e) {
         throw new RuntimeException("Error parsing json content", e);
       }
     }
@@ -147,13 +923,16 @@ public class LogicalPlanConfiguration {
       ObjectMapper mapper = ObjectMapperFactory.getOperatorValueDeserializer();
       try {
         return mapper.writeValueAsString(pojo);
-      } catch (Exception e) {
+      } catch (IOException e) {
         throw new RuntimeException("Error writing object as json", e);
       }
     }
-    
+
   }
 
+  /**
+   * This is an abstract class representing the configuration applied to an element in the DAG.
+   */
   private static abstract class Conf {
 
     protected Conf parentConf = null;
@@ -184,7 +963,7 @@ public class LogicalPlanConfiguration {
 
     @SuppressWarnings("unchecked")
     public <T extends Conf> T getAncestorConf(StramElement ancestorElement) {
-      if (getElement() == ancestorElement) {
+      if (getConfElement().getStramElement() == ancestorElement) {
         return (T)this;
       }
       if (parentConf == null) {
@@ -221,7 +1000,7 @@ public class LogicalPlanConfiguration {
     }
 
     public <T extends Conf> List<T> getMatchingChildConf(String name, StramElement childType) {
-      List<T> childConfs = new ArrayList<T>();
+      List<T> childConfs = new ArrayList<>();
       Map<String, T> elChildren = getChildren(childType);
       for (Map.Entry<String, T> entry : elChildren.entrySet()) {
         String key = entry.getKey();
@@ -267,14 +1046,18 @@ public class LogicalPlanConfiguration {
           conf = declaredConstructor.newInstance(new Object[] {});
           conf.setId(id);
           map.put(id, conf);
-        } catch (Exception e) {
+        } catch (IllegalAccessException |
+                 IllegalArgumentException |
+                 InstantiationException |
+                 NoSuchMethodException |
+                 SecurityException |
+                 InvocationTargetException e) {
           LOG.error("Error instantiating configuration", e);
         }
       }
       return conf;
     }
 
-    //public abstract Conf getChild(String id, StramElement childType);
     public  <T extends Conf> T getChild(String id, StramElement childType) {
       T conf = null;
       @SuppressWarnings("unchecked")
@@ -290,7 +1073,7 @@ public class LogicalPlanConfiguration {
       // Always return non null so caller will not have to do extra check as expected
       Map<String, T> elChildren = (Map<String, T>)children.get(childType);
       if (elChildren == null) {
-        elChildren = new HashMap<String, T>();
+        elChildren = Maps.newHashMap();
         children.put(childType, elChildren);
       }
       return elChildren;
@@ -301,10 +1084,6 @@ public class LogicalPlanConfiguration {
     public void parseElement(StramElement element, String[] keys, int index, String propertyValue) {
     }
 
-    public Class<? extends Context> getAttributeContextClass() {
-      return null;
-    }
-
     public boolean isAllowedChild(StramElement childType) {
       StramElement[] childElements = getChildElements();
       if (childElements != null) {
@@ -318,7 +1097,7 @@ public class LogicalPlanConfiguration {
     }
 
     public StramElement getDefaultChildElement() {
-      if ((getAttributeContextClass() == null) && isAllowedChild(StramElement.PROP)) {
+      if ((getConfElement().getContextClass() == null) && isAllowedChild(StramElement.PROP)) {
         return StramElement.PROP;
       }
       return null;
@@ -330,8 +1109,7 @@ public class LogicalPlanConfiguration {
 
     public abstract StramElement[] getChildElements();
 
-    public abstract StramElement getElement();
-
+    public abstract ConfElement getConfElement();
   }
 
   private static class StramConf extends Conf {
@@ -345,21 +1123,20 @@ public class LogicalPlanConfiguration {
     }
 
     @Override
-    public StramElement getElement()
+    public StramElement[] getChildElements()
     {
-      return null;
+      return CHILD_ELEMENTS;
     }
 
     @Override
-    public StramElement[] getChildElements()
+    public ConfElement getConfElement()
     {
-      return CHILD_ELEMENTS;
+      return ConfElement.STRAM;
     }
-
   }
 
   /**
-   * App configuration
+   * This holds the configuration information for an Apex application.
    */
   private static class AppConf extends Conf {
 
@@ -372,12 +1149,6 @@ public class LogicalPlanConfiguration {
     }
 
     @Override
-    public StramElement getElement()
-    {
-      return StramElement.APPLICATION;
-    }
-
-    @Override
     public void parseElement(StramElement element, String[] keys, int index, String propertyValue) {
       if ((element == StramElement.CLASS) || (element == StramElement.PATH)) {
         StramConf stramConf = getParentConf();
@@ -391,17 +1162,17 @@ public class LogicalPlanConfiguration {
       return CHILD_ELEMENTS;
     }
 
-    @Override
-    public Class<? extends Context> getAttributeContextClass()
-    {
-      return Context.DAGContext.class;
-    }
 
     @Override
     public StramElement getDefaultChildElement() {
       return StramElement.PROP;
     }
 
+    @Override
+    public ConfElement getConfElement()
+    {
+      return ConfElement.APPLICATION;
+    }
   }
 
   private static class GatewayConf extends Conf {
@@ -419,11 +1190,10 @@ public class LogicalPlanConfiguration {
     }
 
     @Override
-    public StramElement getElement()
+    public ConfElement getConfElement()
     {
-      return StramElement.GATEWAY;
+      return ConfElement.GATEWAY;
     }
-
   }
 
   /**
@@ -444,11 +1214,10 @@ public class LogicalPlanConfiguration {
       return CHILD_ELEMENTS;
     }
 
-
     @Override
-    public StramElement getElement()
+    public ConfElement getConfElement()
     {
-      return StramElement.TEMPLATE;
+      return ConfElement.TEMPLATE;
     }
 
     @Override
@@ -472,23 +1241,23 @@ public class LogicalPlanConfiguration {
   }
 
   /**
-   *
+   * This holds the configuration information for a stream that connects two operators in an Apex application.
    */
   private static class StreamConf extends Conf {
 
     private static final StramElement[] CHILD_ELEMENTS = new StramElement[] {StramElement.TEMPLATE, StramElement.PROP};
 
     private OperatorConf sourceNode;
-    private final Set<OperatorConf> targetNodes = new HashSet<OperatorConf>();
+    private final Set<OperatorConf> targetNodes = new HashSet<>();
 
     @SuppressWarnings("unused")
     StreamConf() {
     }
 
     @Override
-    public StramElement getElement()
+    public ConfElement getConfElement()
     {
-      return StramElement.STREAM;
+      return ConfElement.STREAM;
     }
 
     /**
@@ -534,7 +1303,6 @@ public class LogicalPlanConfiguration {
           throw new IllegalArgumentException("Duplicate " + name);
         }
         String[] parts = getNodeAndPortId(value);
-        //setSource(parts[1], getOrAddOperator(appConf, parts[0]));
         setSource(parts[1], appConf.getOrAddChild(parts[0], StramElement.OPERATOR, OperatorConf.class));
       } else if (STREAM_SINKS.equals(name)) {
         String[] targetPorts = value.split(",");
@@ -575,7 +1343,7 @@ public class LogicalPlanConfiguration {
   }
 
   /**
-   *
+   * This is a simple extension of {@link java.util.Properties} which allows you to specify default properties.
    */
   private static class PropertiesWithModifiableDefaults extends Properties {
     private static final long serialVersionUID = -4675421720308249982L;
@@ -589,7 +1357,7 @@ public class LogicalPlanConfiguration {
   }
 
   /**
-   * Operator configuration
+   * This holds the configuration information for an operator in an Apex application.
    */
   private static class OperatorConf extends Conf {
 
@@ -599,14 +1367,14 @@ public class LogicalPlanConfiguration {
     @SuppressWarnings("unused")
     OperatorConf() {
     }
-    private final Map<String, StreamConf> inputs = new HashMap<String, StreamConf>();
-    private final Map<String, StreamConf> outputs = new HashMap<String, StreamConf>();
+    private final Map<String, StreamConf> inputs = Maps.newHashMap();
+    private final Map<String, StreamConf> outputs = Maps.newHashMap();
     private String templateRef;
 
     @Override
-    public StramElement getElement()
+    public ConfElement getConfElement()
     {
-      return StramElement.OPERATOR;
+      return ConfElement.OPERATOR;
     }
 
     @Override
@@ -651,12 +1419,6 @@ public class LogicalPlanConfiguration {
       return StramElement.PROP;
     }
 
-    @Override
-    public Class<? extends Context> getAttributeContextClass()
-    {
-      return OperatorContext.class;
-    }
-
     /**
      *
      * @return String
@@ -671,7 +1433,7 @@ public class LogicalPlanConfiguration {
   }
 
   /**
-   * Port configuration
+   * This holds the configuration information for a port on an operator in an Apex application.
    */
   private static class PortConf extends Conf {
 
@@ -682,23 +1444,16 @@ public class LogicalPlanConfiguration {
     }
 
     @Override
-    public StramElement getElement()
-    {
-      return StramElement.PORT;
-    }
-
-    @Override
     public StramElement[] getChildElements()
     {
       return CHILD_ELEMENTS;
     }
 
     @Override
-    public Class<? extends Context> getAttributeContextClass()
+    public ConfElement getConfElement()
     {
-      return PortContext.class;
+      return ConfElement.PORT;
     }
-
   }
 
   private static final Map<StramElement, Class<? extends Conf>> elementMaps = Maps.newHashMap();
@@ -715,8 +1470,8 @@ public class LogicalPlanConfiguration {
     elementMaps.put(StramElement.OUTPUT_PORT, PortConf.class);
   }
 
-  private Conf getConf(StramElement element, Conf ancestorConf) {
-    if (element == ancestorConf.getElement()) {
+  private static Conf getConf(StramElement element, Conf ancestorConf) {
+    if (element == ancestorConf.getConfElement().getStramElement()) {
       return ancestorConf;
     }
     // If top most element is reached and didnt match ancestor conf
@@ -724,13 +1479,13 @@ public class LogicalPlanConfiguration {
     if (element == null) {
       return null;
     }
-    StramElement parentElement = getAllowedParentElement(element, ancestorConf);
+    StramElement parentElement = ConfElement.getAllowedParentConf(element);
     Conf parentConf = getConf(parentElement, ancestorConf);
     return parentConf.getOrAddChild(WILDCARD, element, elementMaps.get(element));
   }
 
-  private Conf addConf(StramElement element, String name, Conf ancestorConf) {
-    StramElement parentElement = getAllowedParentElement(element, ancestorConf);
+  private static Conf addConf(StramElement element, String name, Conf ancestorConf) {
+    StramElement parentElement = ConfElement.getAllowedParentConf(element);
     Conf conf1 = null;
     Conf parentConf = getConf(parentElement, ancestorConf);
     if (parentConf != null) {
@@ -739,28 +1494,8 @@ public class LogicalPlanConfiguration {
     return conf1;
   }
 
-  private StramElement getAllowedParentElement(StramElement element, Conf ancestorConf) {
-    StramElement parentElement = null;
-    if ((element == StramElement.APPLICATION)) {
-      parentElement = null;
-    } else if ((element == StramElement.GATEWAY) || (element == StramElement.OPERATOR) || (element == StramElement.STREAM)) {
-      parentElement = StramElement.APPLICATION;
-    } else if ((element == StramElement.PORT) || (element == StramElement.INPUT_PORT) || (element == StramElement.OUTPUT_PORT)) {
-      parentElement = StramElement.OPERATOR;
-    } else if (element == StramElement.TEMPLATE) {
-      parentElement = null;
-    }
-    return parentElement;
-  }
-
-  /*
-  private boolean isApplicationTypeConf(Conf conf) {
-    return (conf.getElement() == null) || (conf.getElement() == StramElement.APPLICATION);
-  }
-  */
-
   private <T extends Conf> List<T> getMatchingChildConf(List<? extends Conf> confs, String name, StramElement childType) {
-    List<T> childConfs = new ArrayList<T>();
+    List<T> childConfs = Lists.newArrayList();
     for (Conf conf1 : confs) {
       List<T> matchingConfs = conf1.getMatchingChildConf(name, childType);
       childConfs.addAll(matchingConfs);
@@ -813,7 +1548,7 @@ public class LogicalPlanConfiguration {
   public String getAppAlias(String appPath) {
     String appAlias;
     if (appPath.endsWith(CLASS_SUFFIX)) {
-      appPath = appPath.replace("/", ".").substring(0, appPath.length() - CLASS_SUFFIX.length());
+      appPath = appPath.replace("/", KEY_SEPARATOR).substring(0, appPath.length() - CLASS_SUFFIX.length());
     }
     appAlias = stramConf.appAliases.get(appPath);
     if (appAlias == null) {
@@ -836,11 +1571,11 @@ public class LogicalPlanConfiguration {
     JSONArray operatorArray = json.getJSONArray("operators");
     for (int i = 0; i < operatorArray.length(); i++) {
       JSONObject operator = operatorArray.getJSONObject(i);
-      String operatorPrefix = StreamingApplication.DT_PREFIX + StramElement.OPERATOR.getValue() + "." + operator.getString("name") + ".";
+      String operatorPrefix = StreamingApplication.DT_PREFIX + StramElement.OPERATOR.getValue() + KEY_SEPARATOR + operator.getString("name") + ".";
       prop.setProperty(operatorPrefix + "classname", operator.getString("class"));
       JSONObject operatorProperties = operator.optJSONObject("properties");
       if (operatorProperties != null) {
-        String propertiesPrefix = operatorPrefix + StramElement.PROP.getValue() + ".";
+        String propertiesPrefix = operatorPrefix + StramElement.PROP.getValue() + KEY_SEPARATOR;
         @SuppressWarnings("unchecked")
         Iterator<String> iter = operatorProperties.keys();
         while (iter.hasNext()) {
@@ -850,7 +1585,7 @@ public class LogicalPlanConfiguration {
       }
       JSONObject operatorAttributes = operator.optJSONObject("attributes");
       if (operatorAttributes != null) {
-        String attributesPrefix = operatorPrefix + StramElement.ATTR.getValue() + ".";
+        String attributesPrefix = operatorPrefix + StramElement.ATTR.getValue() + KEY_SEPARATOR;
         @SuppressWarnings("unchecked")
         Iterator<String> iter = operatorAttributes.keys();
         while (iter.hasNext()) {
@@ -860,12 +1595,12 @@ public class LogicalPlanConfiguration {
       }
       JSONArray portArray = operator.optJSONArray("ports");
       if (portArray != null) {
-        String portsPrefix = operatorPrefix + StramElement.PORT.getValue() + ".";
+        String portsPrefix = operatorPrefix + StramElement.PORT.getValue() + KEY_SEPARATOR;
         for (int j = 0; j < portArray.length(); j++) {
           JSONObject port = portArray.getJSONObject(j);
           JSONObject portAttributes = port.optJSONObject("attributes");
           if (portAttributes != null) {
-            String portAttributePrefix = portsPrefix + port.getString("name") + "." + StramElement.ATTR.getValue() + ".";
+            String portAttributePrefix = portsPrefix + port.getString("name") + KEY_SEPARATOR + StramElement.ATTR.getValue() + KEY_SEPARATOR;
             @SuppressWarnings("unchecked")
             Iterator<String> iter = portAttributes.keys();
             while (iter.hasNext()) {
@@ -876,10 +1611,10 @@ public class LogicalPlanConfiguration {
         }
       }
     }
-    
+
     JSONObject appAttributes = json.optJSONObject("attributes");
     if (appAttributes != null) {
-      String attributesPrefix = StreamingApplication.DT_PREFIX + StramElement.ATTR.getValue() + ".";
+      String attributesPrefix = StreamingApplication.DT_PREFIX + StramElement.ATTR.getValue() + KEY_SEPARATOR;
       @SuppressWarnings("unchecked")
       Iterator<String> iter = appAttributes.keys();
       while (iter.hasNext()) {
@@ -892,9 +1627,9 @@ public class LogicalPlanConfiguration {
     for (int i = 0; i < streamArray.length(); i++) {
       JSONObject stream = streamArray.getJSONObject(i);
       String name = stream.optString("name", "stream-" + i);
-      String streamPrefix = StreamingApplication.DT_PREFIX + StramElement.STREAM.getValue() + "." + name + ".";
+      String streamPrefix = StreamingApplication.DT_PREFIX + StramElement.STREAM.getValue() + KEY_SEPARATOR + name + KEY_SEPARATOR;
       JSONObject source = stream.getJSONObject("source");
-      prop.setProperty(streamPrefix + STREAM_SOURCE, source.getString("operatorName") + "." + source.getString("portName"));
+      prop.setProperty(streamPrefix + STREAM_SOURCE, source.getString("operatorName") + KEY_SEPARATOR + source.getString("portName"));
       JSONArray sinks = stream.getJSONArray("sinks");
       StringBuilder sinkPropertyValue = new StringBuilder();
       for (int j = 0; j < sinks.length(); j++) {
@@ -902,7 +1637,7 @@ public class LogicalPlanConfiguration {
           sinkPropertyValue.append(",");
         }
         JSONObject sink = sinks.getJSONObject(j);
-        sinkPropertyValue.append(sink.getString("operatorName")).append(".").append(sink.getString("portName"));
+        sinkPropertyValue.append(sink.getString("operatorName")).append(KEY_SEPARATOR).append(sink.getString("portName"));
       }
       prop.setProperty(streamPrefix + STREAM_SINKS, sinkPropertyValue.toString());
       String locality = stream.optString("locality", null);
@@ -937,13 +1672,21 @@ public class LogicalPlanConfiguration {
       String propertyValue = props.getProperty(propertyName);
       this.properties.setProperty(propertyName, propertyValue);
       if (propertyName.startsWith(StreamingApplication.DT_PREFIX)) {
-        String[] keyComps = propertyName.split("\\.");
+        String[] keyComps = propertyName.split(KEY_SEPARATOR_SPLIT_REGEX);
         parseStramPropertyTokens(keyComps, 1, propertyName, propertyValue, stramConf);
       }
     }
     return this;
   }
 
+  /**
+   * This method is used to parse an Apex property name.
+   * @param keys The keys into which an Apex property is split into.
+   * @param index The current index that the parser is on for processing the property name.
+   * @param propertyName The original unsplit Apex property name.
+   * @param propertyValue The value corresponding to the Apex property.
+   * @param conf
+   */
   private void parseStramPropertyTokens(String[] keys, int index, String propertyName, String propertyValue, Conf conf) {
     if (index < keys.length) {
       String key = keys[index];
@@ -973,14 +1716,65 @@ public class LogicalPlanConfiguration {
           LOG.error("Invalid configuration key: {}", propertyName);
         }
       } else if ((element == StramElement.ATTR) || ((element == null) && (conf.getDefaultChildElement() == StramElement.ATTR))) {
-        if (conf.getElement() == null) {
+        String attributeName = AttributeParseUtils.getAttributeName(element, keys, index);
+
+        if (element != StramElement.ATTR) {
+          String expName = getCompleteKey(keys, 0, index) + KEY_SEPARATOR + StramElement.ATTR.getValue() + KEY_SEPARATOR + attributeName;
+          LOG.warn("Referencing the attribute as {} instead of {} is deprecated!", getCompleteKey(keys, 0), expName);
+        }
+
+        if (conf.getConfElement().getStramElement() == null) {
           conf = addConf(StramElement.APPLICATION, WILDCARD, conf);
         }
+
         if (conf != null) {
-          // Supporting current implementation where attribute can be directly specified under stram
-          // Re-composing complete key for nested keys which are used in templates
-          // Implement it better way to not pre-tokenize the property string and parse progressively
-          parseAttribute(conf, keys, index, element, propertyValue);
+          if (AttributeParseUtils.isSimpleAttributeName(attributeName)) {
+            //The provided attribute name was a simple name
+
+            if (!AttributeParseUtils.ALL_SIMPLE_ATTRIBUTE_NAMES.contains(attributeName)) {
+              throw new ValidationException("Invalid attribute reference: " + getCompleteKey(keys, 0));
+            }
+
+            if (!conf.getConfElement().getAllChildAttributes().contains(attributeName)) {
+              throw new ValidationException(attributeName
+                                            + " is not defined for the "
+                                            + conf.getConfElement().getStramElement()
+                                            + " or any of its child configurations.");
+            }
+
+            if (conf.getConfElement().getAmbiguousAttributes().contains(attributeName)) {
+              //If the attribute name is ambiguous at this configuration level we should tell the user.
+              LOG.warn("The attribute "
+                       + attributeName
+                       + " is ambiguous when specified on an " + conf.getConfElement().getStramElement());
+            }
+
+            if (conf.getConfElement().getContextAttributes().contains(attributeName)) {
+              @SuppressWarnings("unchecked")
+              Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(conf.getConfElement().getContextClass()).get(attributeName);
+              conf.setAttribute(attr, propertyValue);
+            } else {
+              AttributeParseUtils.processAllConfsForAttribute(conf, attributeName, propertyValue);
+            }
+          } else {
+            //This is a FQ attribute name
+            Class<? extends Context> contextClass = AttributeParseUtils.getContainingContextClass(attributeName);
+
+            //Convert to a simple name
+            attributeName = AttributeParseUtils.getSimpleAttributeName(attributeName);
+
+            if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(attributeName)) {
+              throw new ValidationException(attributeName + " is not a valid attribute in " + contextClass.getCanonicalName());
+            }
+
+            ConfElement confWithAttr = ConfElement.CONTEXT_TO_CONF_ELEMENT.get(contextClass);
+
+            conf = ConfElement.addConfs(conf, confWithAttr);
+
+            @SuppressWarnings("unchecked")
+            Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(confWithAttr.getContextClass()).get(attributeName);
+            conf.setAttribute(attr, propertyValue);
+          }
         } else {
           LOG.error("Invalid configuration key: {}", propertyName);
         }
@@ -992,12 +1786,6 @@ public class LogicalPlanConfiguration {
           prop = getCompleteKey(keys, index+1);
         } else {
           prop = getCompleteKey(keys, index);
-          /*
-          if (conf.getAttributeContextClass() != null) {
-            LOG.warn("Please specify the property {} using the {} keyword as {}", prop, StramElement.PROP.getValue(),
-                getCompleteKey(keys, 0, index) + "." + StramElement.PROP.getValue() + "." + getCompleteKey(keys, index));
-          }
-          */
         }
         if (prop != null) {
           conf.setProperty(prop, propertyValue);
@@ -1023,15 +1811,30 @@ public class LogicalPlanConfiguration {
     return element;
   }
 
-  private String getCompleteKey(String[] keys, int start) {
+  /**
+   * This constructs a string from the keys in the given keys array starting from
+   * the start index inclusive until the end of the array.
+   * @param keys The keys from which to construct a string.
+   * @param start The token to start creating a string from.
+   * @return The completed key.
+   */
+  private static String getCompleteKey(String[] keys, int start) {
     return getCompleteKey(keys, start, keys.length);
   }
 
-  private String getCompleteKey(String[] keys, int start, int end) {
+  /**
+   * This constructs a string from the keys in the given keys array starting from
+   * the start index inclusive until the specified end index exclusive.
+   * @param keys The keys from which to construct a string.
+   * @param start The token to start creating a string from.
+   * @param end 1 + the last index to include in the concatenation.
+   * @return The completed key.
+   */
+  private static String getCompleteKey(String[] keys, int start, int end) {
     StringBuilder sb = new StringBuilder(1024);
     for (int i = start; i < end; ++i) {
       if (i > start) {
-        sb.append(".");
+        sb.append(KEY_SEPARATOR);
       }
       sb.append(keys[i]);
     }
@@ -1099,7 +1902,7 @@ public class LogicalPlanConfiguration {
 
     Map<String, OperatorConf> operators = appConf.getChildren(StramElement.OPERATOR);
 
-    Map<OperatorConf, Operator> nodeMap = new HashMap<OperatorConf, Operator>(operators.size());
+    Map<OperatorConf, Operator> nodeMap = Maps.newHashMapWithExpectedSize(operators.size());
     // add all operators first
     for (Map.Entry<String, OperatorConf> nodeConfEntry : operators.entrySet()) {
       OperatorConf nodeConf = nodeConfEntry.getValue();
@@ -1117,7 +1920,7 @@ public class LogicalPlanConfiguration {
             nd = dag.addOperator(nodeConfEntry.getKey(), nodeClass);
           }
           setOperatorProperties(nd, nodeConf.getProperties());
-        } catch (Exception e) {
+        } catch (IOException e) {
           throw new IllegalArgumentException("Error setting operator properties " + e.getMessage(), e);
         }
         nodeMap.put(nodeConf, nd);
@@ -1178,9 +1981,9 @@ public class LogicalPlanConfiguration {
   /**
    * Populate the logical plan from the streaming application definition and configuration.
    * Configuration is resolved based on application alias, if any.
-   * @param app
-   * @param dag
-   * @param name
+   * @param app The {@lin StreamingApplication} to be run.
+   * @param dag This will hold the {@link LogicalPlan} representation of the given {@link StreamingApplication}.
+   * @param name The path of the application class in the jar.
    */
   public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name)
   {
@@ -1211,10 +2014,6 @@ public class LogicalPlanConfiguration {
     return props;
   }
 
-  private String getSimpleName(Attribute<?> attribute) {
-    return attribute.name.substring(attribute.name.lastIndexOf('.')+1);
-  }
-
   /**
    * Get the configuration opProps for the given operator.
    * These can be operator specific settings or settings from matching templates.
@@ -1229,7 +2028,7 @@ public class LogicalPlanConfiguration {
   }
 
   private Map<String,String> getApplicationProperties(List<AppConf> appConfs){
-    Map<String, String> appProps = new HashMap<String, String>();
+    Map<String, String> appProps = Maps.newHashMap();
     // Apply the configurations in reverse order since the higher priority ones are at the beginning
     for(int i = appConfs.size()-1; i >= 0; i--){
       AppConf conf1 = appConfs.get(i);
@@ -1246,7 +2045,7 @@ public class LogicalPlanConfiguration {
    */
   private Map<String, String> getProperties(OperatorMeta ow, List<OperatorConf> opConfs, String appName)
   {
-    Map<String, String> opProps = new HashMap<String, String>();
+    Map<String, String> opProps = Maps.newHashMap();
     Map<String, TemplateConf> templates = stramConf.getChildren(StramElement.TEMPLATE);
     // list of all templates that match operator, ordered by priority
     if (!templates.isEmpty()) {
@@ -1269,12 +2068,11 @@ public class LogicalPlanConfiguration {
       Conf conf1 = opConfs.get(i);
       opProps.putAll(Maps.fromProperties(conf1.properties));
     }
-    //properties.remove(OPERATOR_CLASSNAME);
     return opProps;
   }
 
   private List<TemplateConf> getDirectTemplates(List<OperatorConf> opConfs, Map<String, TemplateConf> templates) {
-    List<TemplateConf> refTemplates = new ArrayList<TemplateConf>();
+    List<TemplateConf> refTemplates = Lists.newArrayList();
     for (TemplateConf t : templates.values()) {
       for (OperatorConf opConf : opConfs) {
         if (t.id.equals(opConf.templateRef)) {
@@ -1293,13 +2091,8 @@ public class LogicalPlanConfiguration {
    * @return TreeMap<Integer, TemplateConf>
    */
   private TreeMap<Integer, TemplateConf> getMatchingTemplates(OperatorMeta ow, String appName, Map<String, TemplateConf> templates) {
-    TreeMap<Integer, TemplateConf> tm = new TreeMap<Integer, TemplateConf>();
+    TreeMap<Integer, TemplateConf> tm = Maps.newTreeMap();
     for (TemplateConf t : templates.values()) {
-      /*if (t.id == nodeConf.templateRef) {
-        // directly assigned applies last
-        tm.put(1, t);
-        continue;
-      } else*/
       if ((t.idRegExp != null && ow.getName().matches(t.idRegExp))) {
         tm.put(1, t);
       } else if (appName != null && t.appNameRegExp != null
@@ -1326,10 +2119,7 @@ public class LogicalPlanConfiguration {
       BeanUtils.populate(operator, properties);
       return operator;
     }
-    catch (IllegalAccessException e) {
-      throw new IllegalArgumentException("Error setting operator properties", e);
-    }
-    catch (InvocationTargetException e) {
+    catch (IllegalAccessException | InvocationTargetException e) {
       throw new IllegalArgumentException("Error setting operator properties", e);
     }
   }
@@ -1340,10 +2130,7 @@ public class LogicalPlanConfiguration {
       BeanUtils.populate(application, properties);
       return application;
     }
-    catch (IllegalAccessException e) {
-      throw new IllegalArgumentException("Error setting application properties", e);
-    }
-    catch (InvocationTargetException e) {
+    catch (IllegalAccessException | InvocationTargetException e) {
       throw new IllegalArgumentException("Error setting application properties", e);
     }
   }
@@ -1369,21 +2156,7 @@ public class LogicalPlanConfiguration {
       setOperatorProperties(ow.getOperator(), opProps);
     }
   }
-/*
-  private static final Map<String, Attribute<?>> legacyKeyMap = Maps.newHashMap();
 
-  static {
-    legacyKeyMap.put("appName", Context.DAGContext.APPLICATION_NAME);
-    legacyKeyMap.put("libjars", Context.DAGContext.LIBRARY_JARS);
-    legacyKeyMap.put("maxContainers", Context.DAGContext.CONTAINERS_MAX_COUNT);
-    legacyKeyMap.put("containerMemoryMB", Context.DAGContext.CONTAINER_MEMORY_MB);
-    legacyKeyMap.put("containerJvmOpts", Context.DAGContext.CONTAINER_JVM_OPTIONS);
-    legacyKeyMap.put("masterMemoryMB", Context.DAGContext.MASTER_MEMORY_MB);
-    legacyKeyMap.put("windowSizeMillis", Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
-    legacyKeyMap.put("appPath", Context.DAGContext.APPLICATION_PATH);
-    legacyKeyMap.put("allocateResourceTimeoutMillis", Context.DAGContext.RESOURCE_ALLOCATION_TIMEOUT_MILLIS);
-  }
-*/
   /**
    * Set the application configuration.
    * @param dag
@@ -1397,18 +2170,7 @@ public class LogicalPlanConfiguration {
   }
 
   private void setApplicationConfiguration(final LogicalPlan dag, List<AppConf> appConfs,StreamingApplication app) {
-    // Make the gateway address available as an application attribute
-//    for (Conf appConf : appConfs) {
-//      Conf gwConf = appConf.getChild(null, StramElement.GATEWAY);
-//      if (gwConf != null) {
-//        String gatewayAddress = gwConf.properties.getProperty(GATEWAY_LISTEN_ADDRESS_PROP);
-//        if (gatewayAddress != null) {
-//          dag.setAttribute(DAGContext.GATEWAY_CONNECT_ADDRESS, gatewayAddress);
-//          break;
-//        }
-//      }
-//    }
-    setAttributes(Context.DAGContext.class, appConfs, dag.getAttributes());
+    setAttributes(appConfs, dag.getAttributes());
     if (app != null) {
       Map<String, String> appProps = getApplicationProperties(appConfs);
       setApplicationProperties(app, appProps);
@@ -1420,7 +2182,7 @@ public class LogicalPlanConfiguration {
       List<OperatorConf> opConfs = getMatchingChildConf(appConfs, ow.getName(), StramElement.OPERATOR);
 
       // Set the operator attributes
-      setAttributes(OperatorContext.class, opConfs, ow.getAttributes());
+      setAttributes(opConfs, ow.getAttributes());
       // Set the operator opProps
       Map<String, String> opProps = getProperties(ow, opConfs, appName);
       setOperatorProperties(ow.getOperator(), opProps);
@@ -1432,7 +2194,7 @@ public class LogicalPlanConfiguration {
         // Add the generic port attributes as well
         List<PortConf> portConfs = getMatchingChildConf(opConfs, im.getPortName(), StramElement.PORT);
         inPortConfs.addAll(portConfs);
-        setAttributes(PortContext.class, inPortConfs, im.getAttributes());
+        setAttributes(inPortConfs, im.getAttributes());
       }
 
       for (Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> entry : ow.getOutputStreams().entrySet()) {
@@ -1441,7 +2203,7 @@ public class LogicalPlanConfiguration {
         // Add the generic port attributes as well
         List<PortConf> portConfs = getMatchingChildConf(opConfs, om.getPortName(), StramElement.PORT);
         outPortConfs.addAll(portConfs);
-        setAttributes(PortContext.class, outPortConfs, om.getAttributes());
+        setAttributes(outPortConfs, om.getAttributes());
       }
       ow.populateAggregatorMeta();
     }
@@ -1460,36 +2222,7 @@ public class LogicalPlanConfiguration {
     }
   }
 
-  private final Map<Class<? extends Context>, Map<String, Attribute<Object>>> attributeMap = Maps.newHashMap();
-
-  private void parseAttribute(Conf conf, String[] keys, int index, StramElement element, String attrValue)
-  {
-    String configKey = (element == StramElement.ATTR) ? getCompleteKey(keys, index + 1) : getCompleteKey(keys, index);
-    boolean isDeprecated = false;
-    Class<? extends Context> clazz = conf.getAttributeContextClass();
-    Map<String, Attribute<Object>> m = attributeMap.get(clazz);
-    if (!attributeMap.containsKey(clazz)) {
-      Set<Attribute<Object>> attributes = AttributeInitializer.getAttributes(clazz);
-      m = Maps.newHashMapWithExpectedSize(attributes.size());
-      for (Attribute<Object> attr : attributes) {
-        m.put(getSimpleName(attr), attr);
-      }
-      attributeMap.put(clazz, m);
-    }
-    Attribute<Object> attr = m.get(configKey);
-    if (attr == null) {
-      throw new ValidationException("Invalid attribute reference: " + getCompleteKey(keys, 0));
-    }
-
-    if (element != StramElement.ATTR || isDeprecated) {
-      String expName = getCompleteKey(keys, 0, index) + "." + StramElement.ATTR.getValue() +  "." + getSimpleName(attr);
-      LOG.warn("Referencing the attribute as {} instead of {} is deprecated!", getCompleteKey(keys, 0), expName);
-    }
-
-    conf.setAttribute(attr, attrValue);
-  }
-
-  private void setAttributes(Class<?> clazz, List<? extends Conf> confs, Attribute.AttributeMap attributeMap) {
+  private void setAttributes(List<? extends Conf> confs, Attribute.AttributeMap attributeMap) {
     Set<Attribute<Object>> processedAttributes = Sets.newHashSet();
     //json object codec for complex attributes
     JSONObject2String jsonCodec = new JSONObject2String();