You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2015/09/24 04:37:58 UTC

[39/50] [abbrv] incubator-apex-core git commit: APEX-28 #resolve

APEX-28 #resolve

 - Rename of files requires a separate commit to preserve attribution.
 - Improved documentation
 - Added unit test to make sure that attributes declared in multiple contexts have the same type.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/977093e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/977093e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/977093e1

Branch: refs/heads/feature-module
Commit: 977093e171f1183985ae80d42b0d6dbc3af6cbc5
Parents: 434a717
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Aug 25 18:03:08 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Wed Sep 16 15:31:44 2015 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/api/Attribute.java     |   11 +-
 .../main/java/com/datatorrent/api/Context.java  |   10 -
 .../stram/plan/logical/LogicalPlan.java         |    6 +-
 .../plan/logical/LogicalPlanConfiguration.java  |  472 +++--
 .../plan/LogicalPlanConfigurationTest.java      | 1788 ------------------
 .../datatorrent/stram/plan/LogicalPlanTest.java |  990 ----------
 .../logical/LogicalPlanConfigurationTest.java   | 1511 +++++++++++++++
 .../stram/plan/logical/LogicalPlanTest.java     |  988 ++++++++++
 .../src/test/resources/schemaTestTopology.json  |    2 +-
 9 files changed, 2785 insertions(+), 2993 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/api/src/main/java/com/datatorrent/api/Attribute.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index 4c16a2a..a7492b5 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -277,13 +277,6 @@ public class Attribute<T> implements Serializable
         if (map.containsKey(clazz)) {
           return 0;
         }
-
-        map.put(clazz, getAttributesNoSave(clazz));
-        return (long)clazz.getModifiers() << 32 | clazz.hashCode();
-      }
-
-      public static Set<Attribute<Object>> getAttributesNoSave(Class<?> clazz)
-      {
         Set<Attribute<Object>> set = new HashSet<Attribute<Object>>();
         try {
           for (Field f: clazz.getDeclaredFields()) {
@@ -330,8 +323,8 @@ public class Attribute<T> implements Serializable
         catch (Exception ex) {
           DTThrowable.rethrow(ex);
         }
-
-        return set;
+        map.put(clazz, set);
+        return (long)clazz.getModifiers() << 32 | clazz.hashCode();
       }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index c2d974a..cd10398 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -33,16 +33,6 @@ import com.datatorrent.api.annotation.Stateless;
  */
 public interface Context
 {
-  /*
-   * Note: If the same name is given to an Attribute specified in multiple Context classes, then the type of that
-   * Attribute is required to be the same accross all Context classes. This is required because if a simple attribute
-   * name is specified in a properties file at the top level context then that attribute needs to be set in all child configurations. If
-   * there were multiple Attributes specified in different Contexts with the same name, but a different type, then
-   * it would not be possible to set the values of Attributes specified by a simple attribute name in the root
-   * context of a properties file. If this were the case, then adding another Attribute with the same name as a pre-existing Attribute to a new Context
-   * class would be a backwards incompatible change.
-   */
-
   /**
    * Get the attributes associated with this context.
    * The returned map does not contain any attributes that may have been defined in the parent context of this context.

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 8826896..94d18ba 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1088,7 +1088,7 @@ public class LogicalPlan implements Serializable, DAG
       if (e.getKey().getOperatorWrapper() == om) {
          stream.sinks.remove(e.getKey());
       }
-      // If persistStream was enabled for stream, reset stream when sink removed 
+      // If persistStream was enabled for stream, reset stream when sink removed
       stream.resetStreamPersistanceOnSinkRemoval(e.getKey());
     }
     this.operators.remove(om.getName());
@@ -1431,11 +1431,11 @@ public class LogicalPlan implements Serializable, DAG
 
     for (StreamMeta s: streams.values()) {
       if (s.source == null) {
-        throw new ValidationException(String.format("stream source not connected: %s", s.getName()));
+        throw new ValidationException("Stream source not connected: " + s.getName());
       }
 
       if (s.sinks.isEmpty()) {
-        throw new ValidationException(String.format("stream sink not connected: %s", s.getName()));
+        throw new ValidationException("Stream sink not connected: " + s.getName());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index a3a18c2..7a53cd7 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -15,6 +15,7 @@
  */
 package com.datatorrent.stram.plan.logical;
 
+
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -22,14 +23,17 @@ import java.io.Serializable;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
 import java.util.*;
 import java.util.Map.Entry;
 
-import jline.internal.Preconditions;
 
 import javax.validation.ValidationException;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -61,7 +65,6 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
-import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.StramElement;
 import com.datatorrent.stram.util.ObjectMapperFactory;
 
 /**
@@ -159,43 +162,16 @@ public class LogicalPlanConfiguration {
    */
   protected enum ConfElement
   {
-    @SuppressWarnings("SetReplaceableByEnumSet")
-    STRAM(null,
-          null,
-          new HashSet<StramElement>(),
-          null),
-    @SuppressWarnings("SetReplaceableByEnumSet")
-    APPLICATION(StramElement.APPLICATION,
-                STRAM,
-                new HashSet<StramElement>(),
-                DAGContext.class),
-    @SuppressWarnings("SetReplaceableByEnumSet")
-    TEMPLATE(StramElement.TEMPLATE,
-             STRAM,
-             new HashSet<StramElement>(),
-             null),
-    @SuppressWarnings("SetReplaceableByEnumSet")
-    GATEWAY(StramElement.GATEWAY,
-            ConfElement.APPLICATION,
-            new HashSet<StramElement>(),
-            null),
-    @SuppressWarnings("SetReplaceableByEnumSet")
-    OPERATOR(StramElement.OPERATOR,
-             ConfElement.APPLICATION,
-             new HashSet<StramElement>(),
-             OperatorContext.class),
-    @SuppressWarnings("SetReplaceableByEnumSet")
-    STREAM(StramElement.STREAM,
-           ConfElement.APPLICATION,
-           new HashSet<StramElement>(),
-           null),
-    PORT(StramElement.PORT,
-         ConfElement.OPERATOR,
-         Sets.newHashSet(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT),
-         PortContext.class);
-
-    public static final Map<StramElement, ConfElement> STRAM_ELEMENT_TO_CONF_ELEMENT = Maps.newHashMap();
-    public static final Map<Class<? extends Context>, ConfElement> CONTEXT_TO_CONF_ELEMENT = Maps.newHashMap();
+    STRAM(null, null, null, null),
+    APPLICATION(StramElement.APPLICATION, STRAM, null, DAGContext.class),
+    TEMPLATE(StramElement.TEMPLATE, STRAM, null, null),
+    GATEWAY(StramElement.GATEWAY, ConfElement.APPLICATION, null, null),
+    OPERATOR(StramElement.OPERATOR, ConfElement.APPLICATION, null, OperatorContext.class),
+    STREAM(StramElement.STREAM, ConfElement.APPLICATION, null, null),
+    PORT(StramElement.PORT, ConfElement.OPERATOR, EnumSet.of(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT), PortContext.class);
+
+    protected static final Map<StramElement, ConfElement> STRAM_ELEMENT_TO_CONF_ELEMENT = Maps.newHashMap();
+    protected static final Map<Class<? extends Context>, ConfElement> CONTEXT_TO_CONF_ELEMENT = Maps.newHashMap();
 
     static {
       initialize();
@@ -246,12 +222,8 @@ public class LogicalPlanConfiguration {
       }
 
       if (!ContextUtils.CONTEXT_CLASSES.equals(confElementContextClasses)) {
-        throw new IllegalStateException("All the context classes "
-                                        + ContextUtils.CONTEXT_CLASSES
-                                        + " found in "
-                                        + Context.class
-                                        + " are not used by ConfElements "
-                                        + confElementContextClasses);
+        throw new IllegalStateException("All the context classes " + ContextUtils.CONTEXT_CLASSES + " found in "
+                                        + Context.class + " are not used by ConfElements " + confElementContextClasses);
       }
     }
 
@@ -312,16 +284,15 @@ public class LogicalPlanConfiguration {
       this.element = element;
       this.parent = parent;
 
-      this.allRelatedElements.addAll(additionalRelatedElements);
+      if (additionalRelatedElements != null) {
+        this.allRelatedElements.addAll(additionalRelatedElements);
+      }
+
       this.allRelatedElements.add(element);
 
       this.contextClass = contextClass;
 
-      if (contextClass != null) {
-        this.contextAttributes = ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass);
-      } else {
-        this.contextAttributes = Sets.newHashSet();
-      }
+      this.contextAttributes = contextClass != null ? ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass) : new HashSet<String>();
     }
 
     private void setAllChildAttributes(Set<String> allChildAttributes)
@@ -445,8 +416,7 @@ public class LogicalPlanConfiguration {
      *
      * @param conf The current {@link Conf} type.
      * @return A path from the current {@link Conf} type to a root {@link Conf} type, which includes the current and root
-     * {
-     * @lin Conf} types.
+     * {@link Conf} types.
      */
     public static List<StramElement> getPathFromChildToRootInclusive(StramElement conf)
     {
@@ -471,8 +441,7 @@ public class LogicalPlanConfiguration {
      *
      * @param conf The current {@link Conf} type.
      * @return A path from the root {@link Conf} type to the current {@link Conf} type, which includes the current and root
-     * {
-     * @lin Conf} types.
+     * {@link Conf} types.
      */
     public static List<StramElement> getPathFromRootToChildInclusive(StramElement conf)
     {
@@ -487,11 +456,9 @@ public class LogicalPlanConfiguration {
      * @param child The current {@link Conf} type.
      * @param parent The parent {@link Conf} type.
      * @return A path from the current {@link Conf} type to a parent {@link Conf} type, which includes the current and parent
-     * {
-     * @lin Conf} types.
+     * {@link Conf} types.
      */
-    public static List<StramElement> getPathFromChildToParentInclusive(StramElement child,
-                                                                       StramElement parent)
+    public static List<StramElement> getPathFromChildToParentInclusive(StramElement child, StramElement parent)
     {
       ConfElement confElement = STRAM_ELEMENT_TO_CONF_ELEMENT.get(child);
 
@@ -528,11 +495,9 @@ public class LogicalPlanConfiguration {
      * @param child The current {@link Conf} type.
      * @param parent The parent {@link Conf} type.
      * @return A path from the parent {@link Conf} type to the current {@link Conf} type, which includes the current and parent
-     * {
-     * @lin Conf} types.
+     * {@link Conf} types.
      */
-    public static List<StramElement> getPathFromParentToChildInclusive(StramElement child,
-                                                                       StramElement parent)
+    public static List<StramElement> getPathFromParentToChildInclusive(StramElement child, StramElement parent)
     {
       List<StramElement> path = getPathFromChildToParentInclusive(child,
                                                                   parent);
@@ -548,8 +513,7 @@ public class LogicalPlanConfiguration {
      * @return The {@link ConfElement} that contains the given attribute, or null if no {@link ConfElement} contains
      * the given attribute.
      */
-    public static ConfElement findConfElementWithAttribute(ConfElement current,
-                                                           String simpleAttributeName)
+    public static ConfElement findConfElementWithAttribute(ConfElement current, String simpleAttributeName)
     {
       if (current.getContextAttributes().contains(simpleAttributeName)) {
         return current;
@@ -573,9 +537,7 @@ public class LogicalPlanConfiguration {
       List<StramElement> path = ConfElement.getPathFromParentToChildInclusive(childConfElement.getStramElement(),
                                                                               parentConf.getConfElement().getStramElement());
 
-      for (int pathIndex = 1;
-           pathIndex < path.size();
-           pathIndex++) {
+      for (int pathIndex = 1; pathIndex < path.size(); pathIndex++) {
         LOG.debug("Adding conf");
         StramElement pathElement = path.get(pathIndex);
         //Add the configurations we need to hold this attribute
@@ -593,12 +555,19 @@ public class LogicalPlanConfiguration {
   @SuppressWarnings("unchecked")
   protected static class ContextUtils
   {
-    public static final Map<Class<? extends Context>, Set<String>> CONTEXT_CLASS_TO_ATTRIBUTES;
-    public static final Set<Class<? extends Context>> CONTEXT_CLASSES;
-    public static final Map<Class<? extends Context>, Map<String, Attribute<?>>> CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE;
+    private static final Map<String, Type> ATTRIBUTES_TO_TYPE = Maps.newHashMap();
+    public static final Map<Class<? extends Context>, Set<String>> CONTEXT_CLASS_TO_ATTRIBUTES = Maps.newHashMap();
+    public static final Set<Class<? extends Context>> CONTEXT_CLASSES = Sets.newHashSet();
+    public static final Map<Class<? extends Context>, Map<String, Attribute<?>>> CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE = Maps.newHashMap();
 
     static {
-      CONTEXT_CLASSES = Sets.newHashSet();
+      initialize();
+    }
+
+    @VisibleForTesting
+    protected static void initialize()
+    {
+      CONTEXT_CLASSES.clear();
 
       for (Class<?> clazz: Context.class.getDeclaredClasses()) {
         if (!Context.class.isAssignableFrom(clazz)) {
@@ -608,9 +577,17 @@ public class LogicalPlanConfiguration {
         CONTEXT_CLASSES.add((Class<? extends Context>)clazz);
       }
 
-      CONTEXT_CLASS_TO_ATTRIBUTES = Maps.newHashMap();
+      buildAttributeMaps(CONTEXT_CLASSES);
+    }
 
-      for (Class<? extends Context> contextClass: CONTEXT_CLASSES) {
+    @VisibleForTesting
+    protected static void buildAttributeMaps(Set<Class<? extends Context>> contextClasses)
+    {
+      CONTEXT_CLASS_TO_ATTRIBUTES.clear();
+      CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.clear();
+      ATTRIBUTES_TO_TYPE.clear();
+
+      for (Class<? extends Context> contextClass: contextClasses) {
         Set<String> contextAttributes = Sets.newHashSet();
 
         Field[] fields = contextClass.getDeclaredFields();
@@ -620,19 +597,29 @@ public class LogicalPlanConfiguration {
             continue;
           }
 
+          Type fieldType = ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[0];
           contextAttributes.add(field.getName());
+
+          Type existingType = ATTRIBUTES_TO_TYPE.get(field.getName());
+
+          if (existingType != null && !existingType.equals(fieldType)) {
+            throw new ValidationException("The attribute " + field.getName() +
+                                          " is defined with two different types in two different context classes: " +
+                                          fieldType + " and " + existingType + "\n" +
+                                          "Attributes with the same name are required to have the same type accross all Context classes.");
+          }
+
+          ATTRIBUTES_TO_TYPE.put(field.getName(), fieldType);
         }
 
         CONTEXT_CLASS_TO_ATTRIBUTES.put(contextClass, contextAttributes);
       }
 
-      CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE = Maps.newHashMap();
-
-      for (Class<? extends Context> contextClass: CONTEXT_CLASSES) {
+      for (Class<? extends Context> contextClass: contextClasses) {
         Map<String, Attribute<?>> simpleAttributeNameToAttribute = Maps.newHashMap();
         CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.put(contextClass, simpleAttributeNameToAttribute);
 
-        Set<Attribute<Object>> attributes = AttributeInitializer.getAttributesNoSave(contextClass);
+        Set<Attribute<Object>> attributes = AttributeInitializer.getAttributes(contextClass);
 
         LOG.debug("context class {} and attributes {}", contextClass, attributes);
 
@@ -644,6 +631,7 @@ public class LogicalPlanConfiguration {
 
     private ContextUtils()
     {
+      //Private construct to prevent instantiation of utility class
     }
 
     /**
@@ -735,6 +723,7 @@ public class LogicalPlanConfiguration {
 
     private AttributeParseUtils()
     {
+      //Private construct to prevent instantiation of utility class
     }
 
     /**
@@ -782,11 +771,7 @@ public class LogicalPlanConfiguration {
     {
 
       if (element != null && element != StramElement.ATTR) {
-        throw new IllegalArgumentException("The given "
-                                           + StramElement.class
-                                           + " must either have a value of null or "
-                                           + StramElement.ATTR
-                                           + " but it had a value of " + element);
+        throw new IllegalArgumentException("The given " + StramElement.class + " must either have a value of null or " + StramElement.ATTR + " but it had a value of " + element);
       }
 
       String attributeName;
@@ -823,9 +808,7 @@ public class LogicalPlanConfiguration {
     public static Class<? extends Context> getContainingContextClass(String attributeName)
     {
       if (isSimpleAttributeName(attributeName)) {
-        throw new IllegalArgumentException("The given attribute name "
-                                           + attributeName
-                                           + " is simple.");
+        throw new IllegalArgumentException("The given attribute name " + attributeName + " is simple.");
       }
 
       LOG.debug("Attribute Name {}", attributeName);
@@ -847,9 +830,7 @@ public class LogicalPlanConfiguration {
         if (Context.class.isAssignableFrom(clazz)) {
           contextClass = (Class<? extends Context>)clazz;
         } else {
-          throw new IllegalArgumentException("The provided context class name "
-                                             + contextClassName
-                                             + " is not valid.");
+          throw new IllegalArgumentException("The provided context class name " + contextClassName + " is not valid.");
         }
       } catch (ClassNotFoundException ex) {
         throw new IllegalArgumentException(ex);
@@ -858,9 +839,7 @@ public class LogicalPlanConfiguration {
       String simpleAttributeName = getSimpleAttributeName(attributeName);
 
       if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(simpleAttributeName)) {
-        throw new ValidationException(simpleAttributeName
-                                      + " is not a valid attribute of "
-                                      + contextClass);
+        throw new ValidationException(simpleAttributeName + " is not a valid attribute of " + contextClass);
       }
 
       return contextClass;
@@ -879,9 +858,7 @@ public class LogicalPlanConfiguration {
       }
 
       if (attributeName.endsWith(KEY_SEPARATOR)) {
-        throw new IllegalArgumentException("The given attribute name ends with \""
-                                           + KEY_SEPARATOR
-                                           + "\" so a simple name cannot be extracted.");
+        throw new IllegalArgumentException("The given attribute name ends with \"" + KEY_SEPARATOR + "\" so a simple name cannot be extracted.");
       }
 
       return attributeName.substring(attributeName.lastIndexOf(KEY_SEPARATOR) + 1, attributeName.length());
@@ -961,6 +938,13 @@ public class LogicalPlanConfiguration {
       return (T)parentConf;
     }
 
+    /**
+     * Gets an ancestor {@link Conf} of this {@link Conf} of the given {@link StramElement} type.
+     * @param <T> The {@link Conf} Class of the ancestor conf
+     * @param ancestorElement The {@link StramElement} representing the type of the ancestor {@link Conf}.
+     * @return The ancestor {@link Conf} of the corresponding {@link StramElement} type, or null if no ancestor {@link Conf} with
+     * the given {@link StramElement} type exists.
+     */
     @SuppressWarnings("unchecked")
     public <T extends Conf> T getAncestorConf(StramElement ancestorElement) {
       if (getConfElement().getStramElement() == ancestorElement) {
@@ -973,6 +957,16 @@ public class LogicalPlanConfiguration {
       }
     }
 
+    /**
+     * This method retrieves a child {@link Conf} of the given {@link StramElement} type with the given name. If
+     * a child {@link Conf} with the given name and {@link StramElement} type doesn't exist, then it is added.
+     * @param <T> The type of the child {@link Conf}.
+     * @param id The name of the child {@link Conf}.
+     * @param childType The {@link StramElement} representing the type of the child {@link Conf}.
+     * @param clazz The {@link java.lang.Class} of the child {@link Conf} to add if a {@link Conf} of the given id
+     * and {@link StramElement} type is not present.
+     * @return A child {@link Conf} of this {@link Conf} with the given id and {@link StramElement} type.
+     */
     public <T extends Conf> T getOrAddChild(String id, StramElement childType, Class<T> clazz) {
       @SuppressWarnings("unchecked")
       Map<String, T> elChildren = (Map<String, T>)children.get(childType);
@@ -999,6 +993,15 @@ public class LogicalPlanConfiguration {
       properties.setDefaultProperties(defaults);
     }
 
+    /**
+     * This method returns a list of all the child {@link Conf}s of this {@link Conf} with the matching name
+     * and {@link StramElement} type.
+     * @param <T> The types of the child {@link Conf}s.
+     * @param name The name of the child {@link Conf}s to return. If the name of the specified child {@link Conf}
+     * is null then configurations with the name specified as a {@link LogicalPlanConfiguration#WILDCARD} are matched.
+     * @param childType The {@link StramElement} corresponding to the type of a child {@link Conf}.
+     * @return The list of child {@link Conf}s with a matching name and {@link StramElement} type.
+     */
     public <T extends Conf> List<T> getMatchingChildConf(String name, StramElement childType) {
       List<T> childConfs = new ArrayList<>();
       Map<String, T> elChildren = getChildren(childType);
@@ -1038,6 +1041,17 @@ public class LogicalPlanConfiguration {
       return childConfs;
     }
 
+    /**
+     * Returns the {@link Conf} corresponding to the given id from the given map. If a {@link Conf} with the
+     * given id is not present in the given map, then a new {@link Conf} of the given class is created and added
+     * to the map.
+     * @param <T> The type of the {@link Conf}s contained in the map.
+     * @param map The map to retrieve a {@link Conf} from or add a {@link Conf} to.
+     * @param id The name of the {@link Conf} to retrieve from or add to the given map.
+     * @param clazz The {@link java.lang.Class} of the {@link Conf} to add to the given map, if a {@link Conf} with
+     * the given name is not present in the given map.
+     * @return A {@link Conf} with the given name, contained in the given map.
+     */
     protected <T extends Conf> T getOrAddConf(Map<String, T> map, String id, Class<T> clazz) {
       T conf = map.get(id);
       if (conf == null) {
@@ -1046,12 +1060,7 @@ public class LogicalPlanConfiguration {
           conf = declaredConstructor.newInstance(new Object[] {});
           conf.setId(id);
           map.put(id, conf);
-        } catch (IllegalAccessException |
-                 IllegalArgumentException |
-                 InstantiationException |
-                 NoSuchMethodException |
-                 SecurityException |
-                 InvocationTargetException e) {
+        } catch (IllegalAccessException | IllegalArgumentException | InstantiationException | NoSuchMethodException | SecurityException | InvocationTargetException e) {
           LOG.error("Error instantiating configuration", e);
         }
       }
@@ -1470,6 +1479,19 @@ public class LogicalPlanConfiguration {
     elementMaps.put(StramElement.OUTPUT_PORT, PortConf.class);
   }
 
+  /**
+   * This is a helper method which performs the following checks:<br/><br/>
+   * <ol>
+   *    <li>If the given {@link StramElement} corresponds to a {@link Conf} type which is
+   * the same as the type of the given {@link Conf}, then the given {@link Conf} is returned.</li>
+   *    <li>If the given {@link StramElement} corresponds to a {@link Conf} type which is
+   * a valid parent {@link Conf} type for the given ancestorConf, then the given ancestor {@link Conf} is
+   * returned.</li>
+   * @param element The {@link StramElement} type corresponding to this {@link Conf} or
+   * to a valid ancestor {@link Conf}.
+   * @param ancestorConf The {@link Conf} to return.
+   * @return The given {@link Conf}, or null if the first call to this method passes a null {@link StramElement}.
+   */
   private static Conf getConf(StramElement element, Conf ancestorConf) {
     if (element == ancestorConf.getConfElement().getStramElement()) {
       return ancestorConf;
@@ -1481,9 +1503,23 @@ public class LogicalPlanConfiguration {
     }
     StramElement parentElement = ConfElement.getAllowedParentConf(element);
     Conf parentConf = getConf(parentElement, ancestorConf);
+
+    if(parentConf == null) {
+      throw new IllegalArgumentException("The given StramElement is not the same type as the given ancestorConf, " +
+                                         "and it is not a valid type for a parent conf.");
+    }
+
     return parentConf.getOrAddChild(WILDCARD, element, elementMaps.get(element));
   }
 
+  /**
+   * This method adds a child {@link Conf} with the given {@link StramElement} type and name to the given
+   * ancestorConf.
+   * @param element The {@link StramElement} of the child {@link Conf} to add to the given ancestorConf.
+   * @param name The name of the child {@link Conf} to add to the given ancestorConf.
+   * @param ancestorConf The {@link Conf} to add a child {@link Conf} to.
+   * @return The child {@link Conf} that was added to the given ancestorConf.
+   */
   private static Conf addConf(StramElement element, String name, Conf ancestorConf) {
     StramElement parentElement = ConfElement.getAllowedParentConf(element);
     Conf conf1 = null;
@@ -1494,6 +1530,16 @@ public class LogicalPlanConfiguration {
     return conf1;
   }
 
+  /**
+   * This method returns a list of all the child {@link Conf}s of the given {@link List} of {@link Conf}s with the matching name
+   * and {@link StramElement} type.
+   * @param <T> The types of the child {@link Conf}s.
+   * @param confs The list of {@link Conf}s whose children will be searched.
+   * @param name The name of the child {@link Conf}s to return. If the name of the specified child {@link Conf}
+   * is null then configurations with the name specified as a {@link LogicalPlanConfiguration#WILDCARD} are matched.
+   * @param childType The {@link StramElement} corresponding to the type of a child {@link Conf}.
+   * @return The list of child {@link Conf}s with a matching name and {@link StramElement} type.
+   */
   private <T extends Conf> List<T> getMatchingChildConf(List<? extends Conf> confs, String name, StramElement childType) {
     List<T> childConfs = Lists.newArrayList();
     for (Conf conf1 : confs) {
@@ -1685,7 +1731,7 @@ public class LogicalPlanConfiguration {
    * @param index The current index that the parser is on for processing the property name.
    * @param propertyName The original unsplit Apex property name.
    * @param propertyValue The value corresponding to the Apex property.
-   * @param conf
+   * @param conf The current {@link Conf} to add properties to.
    */
   private void parseStramPropertyTokens(String[] keys, int index, String propertyName, String propertyValue, Conf conf) {
     if (index < keys.length) {
@@ -1697,104 +1743,141 @@ public class LogicalPlanConfiguration {
       if ((element == StramElement.APPLICATION) || (element == StramElement.OPERATOR) || (element == StramElement.STREAM)
               || (element == StramElement.PORT) || (element == StramElement.INPUT_PORT) || (element == StramElement.OUTPUT_PORT)
               || (element == StramElement.TEMPLATE)) {
-        if ((index + 1) < keys.length) {
-          String name = keys[index+1];
-          Conf elConf = addConf(element, name, conf);
-          if (elConf != null) {
-            parseStramPropertyTokens(keys, index + 2, propertyName, propertyValue, elConf);
-          } else {
-            LOG.error("Invalid configuration key: {}", propertyName);
-          }
-        } else {
-          LOG.warn("Invalid configuration key: {}", propertyName);
-        }
-      } else if ((element == StramElement.GATEWAY)) {
-        Conf elConf = addConf(element, null, conf);
-        if (elConf != null) {
-          parseStramPropertyTokens(keys, index+1, propertyName, propertyValue, elConf);
-        } else {
-          LOG.error("Invalid configuration key: {}", propertyName);
-        }
+        parseAppElement(index, keys, element, conf, propertyName, propertyValue);
+      } else if (element == StramElement.GATEWAY) {
+        parseGatewayElement(element, conf, keys, index, propertyName, propertyValue);
       } else if ((element == StramElement.ATTR) || ((element == null) && (conf.getDefaultChildElement() == StramElement.ATTR))) {
-        String attributeName = AttributeParseUtils.getAttributeName(element, keys, index);
-
-        if (element != StramElement.ATTR) {
-          String expName = getCompleteKey(keys, 0, index) + KEY_SEPARATOR + StramElement.ATTR.getValue() + KEY_SEPARATOR + attributeName;
-          LOG.warn("Referencing the attribute as {} instead of {} is deprecated!", getCompleteKey(keys, 0), expName);
-        }
-
-        if (conf.getConfElement().getStramElement() == null) {
-          conf = addConf(StramElement.APPLICATION, WILDCARD, conf);
-        }
-
-        if (conf != null) {
-          if (AttributeParseUtils.isSimpleAttributeName(attributeName)) {
-            //The provided attribute name was a simple name
-
-            if (!AttributeParseUtils.ALL_SIMPLE_ATTRIBUTE_NAMES.contains(attributeName)) {
-              throw new ValidationException("Invalid attribute reference: " + getCompleteKey(keys, 0));
-            }
-
-            if (!conf.getConfElement().getAllChildAttributes().contains(attributeName)) {
-              throw new ValidationException(attributeName
-                                            + " is not defined for the "
-                                            + conf.getConfElement().getStramElement()
-                                            + " or any of its child configurations.");
-            }
-
-            if (conf.getConfElement().getAmbiguousAttributes().contains(attributeName)) {
-              //If the attribute name is ambiguous at this configuration level we should tell the user.
-              LOG.warn("The attribute "
-                       + attributeName
-                       + " is ambiguous when specified on an " + conf.getConfElement().getStramElement());
-            }
-
-            if (conf.getConfElement().getContextAttributes().contains(attributeName)) {
-              @SuppressWarnings("unchecked")
-              Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(conf.getConfElement().getContextClass()).get(attributeName);
-              conf.setAttribute(attr, propertyValue);
-            } else {
-              AttributeParseUtils.processAllConfsForAttribute(conf, attributeName, propertyValue);
-            }
-          } else {
-            //This is a FQ attribute name
-            Class<? extends Context> contextClass = AttributeParseUtils.getContainingContextClass(attributeName);
-
-            //Convert to a simple name
-            attributeName = AttributeParseUtils.getSimpleAttributeName(attributeName);
-
-            if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(attributeName)) {
-              throw new ValidationException(attributeName + " is not a valid attribute in " + contextClass.getCanonicalName());
-            }
+        parseAttributeElement(element, keys, index, conf, propertyValue, propertyName);
+      } else if ((element == StramElement.PROP) || ((element == null) && (conf.getDefaultChildElement() == StramElement.PROP))) {
+        parsePropertyElement(element, keys, index, conf, propertyValue, propertyName);
+      } else if (element != null) {
+        conf.parseElement(element, keys, index, propertyValue);
+      }
+    }
+  }
 
-            ConfElement confWithAttr = ConfElement.CONTEXT_TO_CONF_ELEMENT.get(contextClass);
+  /**
+   * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing an app element.
+   * @param element The current {@link StramElement} of the property being parsed.
+   * @param keys The keys that the property being parsed was split into.
+   * @param index The current key that the parser is on.
+   * @param propertyValue The value associated with the property being parsed.
+   * @param propertyName The complete unprocessed name of the property being parsed.
+   */
+  private void parseAppElement(int index, String[] keys, StramElement element, Conf conf1, String propertyName, String propertyValue)
+  {
+    if ((index + 1) < keys.length) {
+      String name = keys[index+1];
+      Conf elConf = addConf(element, name, conf1);
+      if (elConf != null) {
+        parseStramPropertyTokens(keys, index + 2, propertyName, propertyValue, elConf);
+      } else {
+        LOG.error("Invalid configuration key: {}", propertyName);
+      }
+    } else {
+      LOG.warn("Invalid configuration key: {}", propertyName);
+    }
+  }
 
-            conf = ConfElement.addConfs(conf, confWithAttr);
+  /**
+   * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing a gateway element.
+   * @param element The current {@link StramElement} of the property being parsed.
+   * @param keys The keys that the property being parsed was split into.
+   * @param index The current key that the parser is on.
+   * @param propertyValue The value associated with the property being parsed.
+   * @param propertyName The complete unprocessed name of the property being parsed.
+   */
+  private void parseGatewayElement(StramElement element, Conf conf1, String[] keys, int index, String propertyName, String propertyValue)
+  {
+    Conf elConf = addConf(element, null, conf1);
+    if (elConf != null) {
+      parseStramPropertyTokens(keys, index+1, propertyName, propertyValue, elConf);
+    } else {
+      LOG.error("Invalid configuration key: {}", propertyName);
+    }
+  }
 
-            @SuppressWarnings("unchecked")
-            Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(confWithAttr.getContextClass()).get(attributeName);
-            conf.setAttribute(attr, propertyValue);
-          }
-        } else {
-          LOG.error("Invalid configuration key: {}", propertyName);
+  /**
+   * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing an attribute.
+   * @param element The current {@link StramElement} of the property being parsed.
+   * @param keys The keys that the property being parsed was split into.
+   * @param index The current key that the parser is on.
+   * @param conf The current {@link Conf}.
+   * @param propertyValue The value associated with the property being parsed.
+   * @param propertyName The complete unprocessed name of the property being parsed.
+   */
+  private void parseAttributeElement(StramElement element, String[] keys, int index, Conf conf, String propertyValue, String propertyName)
+  {
+    String attributeName = AttributeParseUtils.getAttributeName(element, keys, index);
+    if (element != StramElement.ATTR) {
+      String expName = getCompleteKey(keys, 0, index) + KEY_SEPARATOR + StramElement.ATTR.getValue() + KEY_SEPARATOR + attributeName;
+      LOG.warn("Referencing the attribute as {} instead of {} is deprecated!", getCompleteKey(keys, 0), expName);
+    }
+    if (conf.getConfElement().getStramElement() == null) {
+      conf = addConf(StramElement.APPLICATION, WILDCARD, conf);
+    }
+    if (conf != null) {
+      if (AttributeParseUtils.isSimpleAttributeName(attributeName)) {
+        //The provided attribute name was a simple name
+        if (!AttributeParseUtils.ALL_SIMPLE_ATTRIBUTE_NAMES.contains(attributeName)) {
+          throw new ValidationException("Invalid attribute reference: " + getCompleteKey(keys, 0));
         }
-      } else if ((element == StramElement.PROP) || ((element == null) && (conf.getDefaultChildElement() == StramElement.PROP))) {
-        // Currently opProps are only supported on operators and streams
-        // Supporting current implementation where property can be directly specified under operator
-        String prop;
-        if (element == StramElement.PROP) {
-          prop = getCompleteKey(keys, index+1);
-        } else {
-          prop = getCompleteKey(keys, index);
+        if (!conf.getConfElement().getAllChildAttributes().contains(attributeName)) {
+          throw new ValidationException(attributeName + " is not defined for the " + conf.getConfElement().getStramElement() + " or any of its child configurations.");
+        }
+        if (conf.getConfElement().getAmbiguousAttributes().contains(attributeName)) {
+          //If the attribute name is ambiguous at this configuration level we should tell the user.
+          LOG.warn("The attribute " + attributeName + " is ambiguous when specified on an " + conf.getConfElement().getStramElement());
         }
-        if (prop != null) {
-          conf.setProperty(prop, propertyValue);
+        if (conf.getConfElement().getContextAttributes().contains(attributeName)) {
+          @SuppressWarnings(value = "unchecked")
+                  Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(conf.getConfElement().getContextClass()).get(attributeName);
+          conf.setAttribute(attr, propertyValue);
         } else {
-          LOG.warn("Invalid property specification, no property name specified for {}", propertyName);
+          AttributeParseUtils.processAllConfsForAttribute(conf, attributeName, propertyValue);
         }
-      } else if (element != null) {
-        conf.parseElement(element, keys, index, propertyValue);
+      } else {
+        //This is a FQ attribute name
+        Class<? extends Context> contextClass = AttributeParseUtils.getContainingContextClass(attributeName);
+        //Convert to a simple name
+        attributeName = AttributeParseUtils.getSimpleAttributeName(attributeName);
+        if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(attributeName)) {
+          throw new ValidationException(attributeName + " is not a valid attribute in " + contextClass.getCanonicalName());
+        }
+        ConfElement confWithAttr = ConfElement.CONTEXT_TO_CONF_ELEMENT.get(contextClass);
+        conf = ConfElement.addConfs(conf, confWithAttr);
+        @SuppressWarnings("unchecked")
+        Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(confWithAttr.getContextClass()).get(attributeName);
+        conf.setAttribute(attr, propertyValue);
       }
+    } else {
+      LOG.error("Invalid configuration key: {}", propertyName);
+    }
+  }
+
+  /**
+   * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing a prop.
+   * @param element The current {@link StramElement} of the property being parsed.
+   * @param keys The keys that the property being parsed was split into.
+   * @param index The current key that the parser is on.
+   * @param conf The current {@link Conf}.
+   * @param propertyValue The value associated with the property being parsed.
+   * @param propertyName The complete unprocessed name of the property being parsed.
+   */
+  private void parsePropertyElement(StramElement element, String[] keys, int index, Conf conf, String propertyValue, String propertyName)
+  {
+    // Currently opProps are only supported on operators and streams
+    // Supporting current implementation where property can be directly specified under operator
+    String prop;
+    if (element == StramElement.PROP) {
+      prop = getCompleteKey(keys, index+1);
+    } else {
+      prop = getCompleteKey(keys, index);
+    }
+    if (prop != null) {
+      conf.setProperty(prop, propertyValue);
+    } else {
+      LOG.warn("Invalid property specification, no property name specified for {}", propertyName);
     }
   }
 
@@ -1831,7 +1914,12 @@ public class LogicalPlanConfiguration {
    * @return The completed key.
    */
   private static String getCompleteKey(String[] keys, int start, int end) {
-    StringBuilder sb = new StringBuilder(1024);
+    int length = 0;
+    for (int keyIndex = 0; keyIndex < keys.length; keyIndex++) {
+      length += keys[keyIndex].length();
+    }
+
+    StringBuilder sb = new StringBuilder(length);
     for (int i = start; i < end; ++i) {
       if (i > start) {
         sb.append(KEY_SEPARATOR);