You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2015/09/24 04:37:58 UTC
[39/50] [abbrv] incubator-apex-core git commit: APEX-28 #resolve
APEX-28 #resolve
- Rename of files requires a separate commit to preserve attribution.
- Improved documentation
- Added unit test to make sure that attributes declared in multiple contexts have the same type.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/977093e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/977093e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/977093e1
Branch: refs/heads/feature-module
Commit: 977093e171f1183985ae80d42b0d6dbc3af6cbc5
Parents: 434a717
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Aug 25 18:03:08 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Wed Sep 16 15:31:44 2015 -0700
----------------------------------------------------------------------
.../java/com/datatorrent/api/Attribute.java | 11 +-
.../main/java/com/datatorrent/api/Context.java | 10 -
.../stram/plan/logical/LogicalPlan.java | 6 +-
.../plan/logical/LogicalPlanConfiguration.java | 472 +++--
.../plan/LogicalPlanConfigurationTest.java | 1788 ------------------
.../datatorrent/stram/plan/LogicalPlanTest.java | 990 ----------
.../logical/LogicalPlanConfigurationTest.java | 1511 +++++++++++++++
.../stram/plan/logical/LogicalPlanTest.java | 988 ++++++++++
.../src/test/resources/schemaTestTopology.json | 2 +-
9 files changed, 2785 insertions(+), 2993 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/api/src/main/java/com/datatorrent/api/Attribute.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index 4c16a2a..a7492b5 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -277,13 +277,6 @@ public class Attribute<T> implements Serializable
if (map.containsKey(clazz)) {
return 0;
}
-
- map.put(clazz, getAttributesNoSave(clazz));
- return (long)clazz.getModifiers() << 32 | clazz.hashCode();
- }
-
- public static Set<Attribute<Object>> getAttributesNoSave(Class<?> clazz)
- {
Set<Attribute<Object>> set = new HashSet<Attribute<Object>>();
try {
for (Field f: clazz.getDeclaredFields()) {
@@ -330,8 +323,8 @@ public class Attribute<T> implements Serializable
catch (Exception ex) {
DTThrowable.rethrow(ex);
}
-
- return set;
+ map.put(clazz, set);
+ return (long)clazz.getModifiers() << 32 | clazz.hashCode();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index c2d974a..cd10398 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -33,16 +33,6 @@ import com.datatorrent.api.annotation.Stateless;
*/
public interface Context
{
- /*
- * Note: If the same name is given to an Attribute specified in multiple Context classes, then the type of that
- * Attribute is required to be the same accross all Context classes. This is required because if a simple attribute
- * name is specified in a properties file at the top level context then that attribute needs to be set in all child configurations. If
- * there were multiple Attributes specified in different Contexts with the same name, but a different type, then
- * it would not be possible to set the values of Attributes specified by a simple attribute name in the root
- * context of a properties file. If this were the case, then adding another Attribute with the same name as a pre-existing Attribute to a new Context
- * class would be a backwards incompatible change.
- */
-
/**
* Get the attributes associated with this context.
* The returned map does not contain any attributes that may have been defined in the parent context of this context.
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 8826896..94d18ba 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1088,7 +1088,7 @@ public class LogicalPlan implements Serializable, DAG
if (e.getKey().getOperatorWrapper() == om) {
stream.sinks.remove(e.getKey());
}
- // If persistStream was enabled for stream, reset stream when sink removed
+ // If persistStream was enabled for stream, reset stream when sink removed
stream.resetStreamPersistanceOnSinkRemoval(e.getKey());
}
this.operators.remove(om.getName());
@@ -1431,11 +1431,11 @@ public class LogicalPlan implements Serializable, DAG
for (StreamMeta s: streams.values()) {
if (s.source == null) {
- throw new ValidationException(String.format("stream source not connected: %s", s.getName()));
+ throw new ValidationException("Stream source not connected: " + s.getName());
}
if (s.sinks.isEmpty()) {
- throw new ValidationException(String.format("stream sink not connected: %s", s.getName()));
+ throw new ValidationException("Stream sink not connected: " + s.getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index a3a18c2..7a53cd7 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -15,6 +15,7 @@
*/
package com.datatorrent.stram.plan.logical;
+
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -22,14 +23,17 @@ import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
import java.util.*;
import java.util.Map.Entry;
-import jline.internal.Preconditions;
import javax.validation.ValidationException;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -61,7 +65,6 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
-import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.StramElement;
import com.datatorrent.stram.util.ObjectMapperFactory;
/**
@@ -159,43 +162,16 @@ public class LogicalPlanConfiguration {
*/
protected enum ConfElement
{
- @SuppressWarnings("SetReplaceableByEnumSet")
- STRAM(null,
- null,
- new HashSet<StramElement>(),
- null),
- @SuppressWarnings("SetReplaceableByEnumSet")
- APPLICATION(StramElement.APPLICATION,
- STRAM,
- new HashSet<StramElement>(),
- DAGContext.class),
- @SuppressWarnings("SetReplaceableByEnumSet")
- TEMPLATE(StramElement.TEMPLATE,
- STRAM,
- new HashSet<StramElement>(),
- null),
- @SuppressWarnings("SetReplaceableByEnumSet")
- GATEWAY(StramElement.GATEWAY,
- ConfElement.APPLICATION,
- new HashSet<StramElement>(),
- null),
- @SuppressWarnings("SetReplaceableByEnumSet")
- OPERATOR(StramElement.OPERATOR,
- ConfElement.APPLICATION,
- new HashSet<StramElement>(),
- OperatorContext.class),
- @SuppressWarnings("SetReplaceableByEnumSet")
- STREAM(StramElement.STREAM,
- ConfElement.APPLICATION,
- new HashSet<StramElement>(),
- null),
- PORT(StramElement.PORT,
- ConfElement.OPERATOR,
- Sets.newHashSet(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT),
- PortContext.class);
-
- public static final Map<StramElement, ConfElement> STRAM_ELEMENT_TO_CONF_ELEMENT = Maps.newHashMap();
- public static final Map<Class<? extends Context>, ConfElement> CONTEXT_TO_CONF_ELEMENT = Maps.newHashMap();
+ STRAM(null, null, null, null),
+ APPLICATION(StramElement.APPLICATION, STRAM, null, DAGContext.class),
+ TEMPLATE(StramElement.TEMPLATE, STRAM, null, null),
+ GATEWAY(StramElement.GATEWAY, ConfElement.APPLICATION, null, null),
+ OPERATOR(StramElement.OPERATOR, ConfElement.APPLICATION, null, OperatorContext.class),
+ STREAM(StramElement.STREAM, ConfElement.APPLICATION, null, null),
+ PORT(StramElement.PORT, ConfElement.OPERATOR, EnumSet.of(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT), PortContext.class);
+
+ protected static final Map<StramElement, ConfElement> STRAM_ELEMENT_TO_CONF_ELEMENT = Maps.newHashMap();
+ protected static final Map<Class<? extends Context>, ConfElement> CONTEXT_TO_CONF_ELEMENT = Maps.newHashMap();
static {
initialize();
@@ -246,12 +222,8 @@ public class LogicalPlanConfiguration {
}
if (!ContextUtils.CONTEXT_CLASSES.equals(confElementContextClasses)) {
- throw new IllegalStateException("All the context classes "
- + ContextUtils.CONTEXT_CLASSES
- + " found in "
- + Context.class
- + " are not used by ConfElements "
- + confElementContextClasses);
+ throw new IllegalStateException("All the context classes " + ContextUtils.CONTEXT_CLASSES + " found in "
+ + Context.class + " are not used by ConfElements " + confElementContextClasses);
}
}
@@ -312,16 +284,15 @@ public class LogicalPlanConfiguration {
this.element = element;
this.parent = parent;
- this.allRelatedElements.addAll(additionalRelatedElements);
+ if (additionalRelatedElements != null) {
+ this.allRelatedElements.addAll(additionalRelatedElements);
+ }
+
this.allRelatedElements.add(element);
this.contextClass = contextClass;
- if (contextClass != null) {
- this.contextAttributes = ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass);
- } else {
- this.contextAttributes = Sets.newHashSet();
- }
+ this.contextAttributes = contextClass != null ? ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass) : new HashSet<String>();
}
private void setAllChildAttributes(Set<String> allChildAttributes)
@@ -445,8 +416,7 @@ public class LogicalPlanConfiguration {
*
* @param conf The current {@link Conf} type.
* @return A path from the current {@link Conf} type to a root {@link Conf} type, which includes the current and root
- * {
- * @lin Conf} types.
+ * {@link Conf} types.
*/
public static List<StramElement> getPathFromChildToRootInclusive(StramElement conf)
{
@@ -471,8 +441,7 @@ public class LogicalPlanConfiguration {
*
* @param conf The current {@link Conf} type.
* @return A path from the root {@link Conf} type to the current {@link Conf} type, which includes the current and root
- * {
- * @lin Conf} types.
+ * {@link Conf} types.
*/
public static List<StramElement> getPathFromRootToChildInclusive(StramElement conf)
{
@@ -487,11 +456,9 @@ public class LogicalPlanConfiguration {
* @param child The current {@link Conf} type.
* @param parent The parent {@link Conf} type.
* @return A path from the current {@link Conf} type to a parent {@link Conf} type, which includes the current and parent
- * {
- * @lin Conf} types.
+ * {@link Conf} types.
*/
- public static List<StramElement> getPathFromChildToParentInclusive(StramElement child,
- StramElement parent)
+ public static List<StramElement> getPathFromChildToParentInclusive(StramElement child, StramElement parent)
{
ConfElement confElement = STRAM_ELEMENT_TO_CONF_ELEMENT.get(child);
@@ -528,11 +495,9 @@ public class LogicalPlanConfiguration {
* @param child The current {@link Conf} type.
* @param parent The parent {@link Conf} type.
* @return A path from the parent {@link Conf} type to the current {@link Conf} type, which includes the current and parent
- * {
- * @lin Conf} types.
+ * {@link Conf} types.
*/
- public static List<StramElement> getPathFromParentToChildInclusive(StramElement child,
- StramElement parent)
+ public static List<StramElement> getPathFromParentToChildInclusive(StramElement child, StramElement parent)
{
List<StramElement> path = getPathFromChildToParentInclusive(child,
parent);
@@ -548,8 +513,7 @@ public class LogicalPlanConfiguration {
* @return The {@link ConfElement} that contains the given attribute, or null if no {@link ConfElement} contains
* the given attribute.
*/
- public static ConfElement findConfElementWithAttribute(ConfElement current,
- String simpleAttributeName)
+ public static ConfElement findConfElementWithAttribute(ConfElement current, String simpleAttributeName)
{
if (current.getContextAttributes().contains(simpleAttributeName)) {
return current;
@@ -573,9 +537,7 @@ public class LogicalPlanConfiguration {
List<StramElement> path = ConfElement.getPathFromParentToChildInclusive(childConfElement.getStramElement(),
parentConf.getConfElement().getStramElement());
- for (int pathIndex = 1;
- pathIndex < path.size();
- pathIndex++) {
+ for (int pathIndex = 1; pathIndex < path.size(); pathIndex++) {
LOG.debug("Adding conf");
StramElement pathElement = path.get(pathIndex);
//Add the configurations we need to hold this attribute
@@ -593,12 +555,19 @@ public class LogicalPlanConfiguration {
@SuppressWarnings("unchecked")
protected static class ContextUtils
{
- public static final Map<Class<? extends Context>, Set<String>> CONTEXT_CLASS_TO_ATTRIBUTES;
- public static final Set<Class<? extends Context>> CONTEXT_CLASSES;
- public static final Map<Class<? extends Context>, Map<String, Attribute<?>>> CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE;
+ private static final Map<String, Type> ATTRIBUTES_TO_TYPE = Maps.newHashMap();
+ public static final Map<Class<? extends Context>, Set<String>> CONTEXT_CLASS_TO_ATTRIBUTES = Maps.newHashMap();
+ public static final Set<Class<? extends Context>> CONTEXT_CLASSES = Sets.newHashSet();
+ public static final Map<Class<? extends Context>, Map<String, Attribute<?>>> CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE = Maps.newHashMap();
static {
- CONTEXT_CLASSES = Sets.newHashSet();
+ initialize();
+ }
+
+ @VisibleForTesting
+ protected static void initialize()
+ {
+ CONTEXT_CLASSES.clear();
for (Class<?> clazz: Context.class.getDeclaredClasses()) {
if (!Context.class.isAssignableFrom(clazz)) {
@@ -608,9 +577,17 @@ public class LogicalPlanConfiguration {
CONTEXT_CLASSES.add((Class<? extends Context>)clazz);
}
- CONTEXT_CLASS_TO_ATTRIBUTES = Maps.newHashMap();
+ buildAttributeMaps(CONTEXT_CLASSES);
+ }
- for (Class<? extends Context> contextClass: CONTEXT_CLASSES) {
+ @VisibleForTesting
+ protected static void buildAttributeMaps(Set<Class<? extends Context>> contextClasses)
+ {
+ CONTEXT_CLASS_TO_ATTRIBUTES.clear();
+ CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.clear();
+ ATTRIBUTES_TO_TYPE.clear();
+
+ for (Class<? extends Context> contextClass: contextClasses) {
Set<String> contextAttributes = Sets.newHashSet();
Field[] fields = contextClass.getDeclaredFields();
@@ -620,19 +597,29 @@ public class LogicalPlanConfiguration {
continue;
}
+ Type fieldType = ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[0];
contextAttributes.add(field.getName());
+
+ Type existingType = ATTRIBUTES_TO_TYPE.get(field.getName());
+
+ if (existingType != null && !existingType.equals(fieldType)) {
+ throw new ValidationException("The attribute " + field.getName() +
+ " is defined with two different types in two different context classes: " +
+ fieldType + " and " + existingType + "\n" +
+ "Attributes with the same name are required to have the same type accross all Context classes.");
+ }
+
+ ATTRIBUTES_TO_TYPE.put(field.getName(), fieldType);
}
CONTEXT_CLASS_TO_ATTRIBUTES.put(contextClass, contextAttributes);
}
- CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE = Maps.newHashMap();
-
- for (Class<? extends Context> contextClass: CONTEXT_CLASSES) {
+ for (Class<? extends Context> contextClass: contextClasses) {
Map<String, Attribute<?>> simpleAttributeNameToAttribute = Maps.newHashMap();
CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.put(contextClass, simpleAttributeNameToAttribute);
- Set<Attribute<Object>> attributes = AttributeInitializer.getAttributesNoSave(contextClass);
+ Set<Attribute<Object>> attributes = AttributeInitializer.getAttributes(contextClass);
LOG.debug("context class {} and attributes {}", contextClass, attributes);
@@ -644,6 +631,7 @@ public class LogicalPlanConfiguration {
private ContextUtils()
{
+ //Private construct to prevent instantiation of utility class
}
/**
@@ -735,6 +723,7 @@ public class LogicalPlanConfiguration {
private AttributeParseUtils()
{
+ //Private construct to prevent instantiation of utility class
}
/**
@@ -782,11 +771,7 @@ public class LogicalPlanConfiguration {
{
if (element != null && element != StramElement.ATTR) {
- throw new IllegalArgumentException("The given "
- + StramElement.class
- + " must either have a value of null or "
- + StramElement.ATTR
- + " but it had a value of " + element);
+ throw new IllegalArgumentException("The given " + StramElement.class + " must either have a value of null or " + StramElement.ATTR + " but it had a value of " + element);
}
String attributeName;
@@ -823,9 +808,7 @@ public class LogicalPlanConfiguration {
public static Class<? extends Context> getContainingContextClass(String attributeName)
{
if (isSimpleAttributeName(attributeName)) {
- throw new IllegalArgumentException("The given attribute name "
- + attributeName
- + " is simple.");
+ throw new IllegalArgumentException("The given attribute name " + attributeName + " is simple.");
}
LOG.debug("Attribute Name {}", attributeName);
@@ -847,9 +830,7 @@ public class LogicalPlanConfiguration {
if (Context.class.isAssignableFrom(clazz)) {
contextClass = (Class<? extends Context>)clazz;
} else {
- throw new IllegalArgumentException("The provided context class name "
- + contextClassName
- + " is not valid.");
+ throw new IllegalArgumentException("The provided context class name " + contextClassName + " is not valid.");
}
} catch (ClassNotFoundException ex) {
throw new IllegalArgumentException(ex);
@@ -858,9 +839,7 @@ public class LogicalPlanConfiguration {
String simpleAttributeName = getSimpleAttributeName(attributeName);
if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(simpleAttributeName)) {
- throw new ValidationException(simpleAttributeName
- + " is not a valid attribute of "
- + contextClass);
+ throw new ValidationException(simpleAttributeName + " is not a valid attribute of " + contextClass);
}
return contextClass;
@@ -879,9 +858,7 @@ public class LogicalPlanConfiguration {
}
if (attributeName.endsWith(KEY_SEPARATOR)) {
- throw new IllegalArgumentException("The given attribute name ends with \""
- + KEY_SEPARATOR
- + "\" so a simple name cannot be extracted.");
+ throw new IllegalArgumentException("The given attribute name ends with \"" + KEY_SEPARATOR + "\" so a simple name cannot be extracted.");
}
return attributeName.substring(attributeName.lastIndexOf(KEY_SEPARATOR) + 1, attributeName.length());
@@ -961,6 +938,13 @@ public class LogicalPlanConfiguration {
return (T)parentConf;
}
+ /**
+ * Gets an ancestor {@link Conf} of this {@link Conf} of the given {@link StramElement} type.
+ * @param <T> The {@link Conf} Class of the ancestor conf
+ * @param ancestorElement The {@link StramElement} representing the type of the ancestor {@link Conf}.
+ * @return The ancestor {@link Conf} of the corresponding {@link StramElement} type, or null if no ancestor {@link Conf} with
+ * the given {@link StramElement} type exists.
+ */
@SuppressWarnings("unchecked")
public <T extends Conf> T getAncestorConf(StramElement ancestorElement) {
if (getConfElement().getStramElement() == ancestorElement) {
@@ -973,6 +957,16 @@ public class LogicalPlanConfiguration {
}
}
+ /**
+ * This method retrieves a child {@link Conf} of the given {@link StramElement} type with the given name. If
+ * a child {@link Conf} with the given name and {@link StramElement} type doesn't exist, then it is added.
+ * @param <T> The type of the child {@link Conf}.
+ * @param id The name of the child {@link Conf}.
+ * @param childType The {@link StramElement} representing the type of the child {@link Conf}.
+ * @param clazz The {@link java.lang.Class} of the child {@link Conf} to add if a {@link Conf} of the given id
+ * and {@link StramElement} type is not present.
+ * @return A child {@link Conf} of this {@link Conf} with the given id and {@link StramElement} type.
+ */
public <T extends Conf> T getOrAddChild(String id, StramElement childType, Class<T> clazz) {
@SuppressWarnings("unchecked")
Map<String, T> elChildren = (Map<String, T>)children.get(childType);
@@ -999,6 +993,15 @@ public class LogicalPlanConfiguration {
properties.setDefaultProperties(defaults);
}
+ /**
+ * This method returns a list of all the child {@link Conf}s of this {@link Conf} with the matching name
+ * and {@link StramElement} type.
+ * @param <T> The types of the child {@link Conf}s.
+ * @param name The name of the child {@link Conf}s to return. If the name of the specified child {@link Conf}
+ * is null then configurations with the name specified as a {@link LogicalPlanConfiguration#WILDCARD} are matched.
+ * @param childType The {@link StramElement} corresponding to the type of a child {@link Conf}.
+ * @return The list of child {@link Conf}s with a matching name and {@link StramElement} type.
+ */
public <T extends Conf> List<T> getMatchingChildConf(String name, StramElement childType) {
List<T> childConfs = new ArrayList<>();
Map<String, T> elChildren = getChildren(childType);
@@ -1038,6 +1041,17 @@ public class LogicalPlanConfiguration {
return childConfs;
}
+ /**
+ * Returns the {@link Conf} corresponding to the given id from the given map. If a {@link Conf} with the
+ * given id is not present in the given map, then a new {@link Conf} of the given class is created and added
+ * to the map.
+ * @param <T> The type of the {@link Conf}s contained in the map.
+ * @param map The map to retrieve a {@link Conf} from or add a {@link Conf} to.
+ * @param id The name of the {@link Conf} to retrieve from or add to the given map.
+ * @param clazz The {@link java.lang.Class} of the {@link Conf} to add to the given map, if a {@link Conf} with
+ * the given name is not present in the given map.
+ * @return A {@link Conf} with the given name, contained in the given map.
+ */
protected <T extends Conf> T getOrAddConf(Map<String, T> map, String id, Class<T> clazz) {
T conf = map.get(id);
if (conf == null) {
@@ -1046,12 +1060,7 @@ public class LogicalPlanConfiguration {
conf = declaredConstructor.newInstance(new Object[] {});
conf.setId(id);
map.put(id, conf);
- } catch (IllegalAccessException |
- IllegalArgumentException |
- InstantiationException |
- NoSuchMethodException |
- SecurityException |
- InvocationTargetException e) {
+ } catch (IllegalAccessException | IllegalArgumentException | InstantiationException | NoSuchMethodException | SecurityException | InvocationTargetException e) {
LOG.error("Error instantiating configuration", e);
}
}
@@ -1470,6 +1479,19 @@ public class LogicalPlanConfiguration {
elementMaps.put(StramElement.OUTPUT_PORT, PortConf.class);
}
+ /**
+ * This is a helper method which performs the following checks:<br/><br/>
+ * <ol>
+ * <li>If the given {@link StramElement} corresponds to a {@link Conf} type which is
+ * the same as the type of the given {@link Conf}, then the given {@link Conf} is returned.</li>
+ * <li>If the given {@link StramElement} corresponds to a {@link Conf} type which is
+ * a valid parent {@link Conf} type for the given ancestorConf, then the given ancestor {@link Conf} is
+ * returned.</li>
+ * @param element The {@link StramElement} type corresponding to this {@link Conf} or
+ * to a valid ancestor {@link Conf}.
+ * @param ancestorConf The {@link Conf} to return.
+ * @return The given {@link Conf}, or null if the first call to this method passes a null {@link StramElement}.
+ */
private static Conf getConf(StramElement element, Conf ancestorConf) {
if (element == ancestorConf.getConfElement().getStramElement()) {
return ancestorConf;
@@ -1481,9 +1503,23 @@ public class LogicalPlanConfiguration {
}
StramElement parentElement = ConfElement.getAllowedParentConf(element);
Conf parentConf = getConf(parentElement, ancestorConf);
+
+ if(parentConf == null) {
+ throw new IllegalArgumentException("The given StramElement is not the same type as the given ancestorConf, " +
+ "and it is not a valid type for a parent conf.");
+ }
+
return parentConf.getOrAddChild(WILDCARD, element, elementMaps.get(element));
}
+ /**
+ * This method adds a child {@link Conf} with the given {@link StramElement} type and name to the given
+ * ancestorConf.
+ * @param element The {@link StramElement} of the child {@link Conf} to add to the given ancestorConf.
+ * @param name The name of the child {@link Conf} to add to the given ancestorConf.
+ * @param ancestorConf The {@link Conf} to add a child {@link Conf} to.
+ * @return The child {@link Conf} that was added to the given ancestorConf.
+ */
private static Conf addConf(StramElement element, String name, Conf ancestorConf) {
StramElement parentElement = ConfElement.getAllowedParentConf(element);
Conf conf1 = null;
@@ -1494,6 +1530,16 @@ public class LogicalPlanConfiguration {
return conf1;
}
+ /**
+ * This method returns a list of all the child {@link Conf}s of the given {@link List} of {@link Conf}s with the matching name
+ * and {@link StramElement} type.
+ * @param <T> The types of the child {@link Conf}s.
+ * @param confs The list of {@link Conf}s whose children will be searched.
+ * @param name The name of the child {@link Conf}s to return. If the name of the specified child {@link Conf}
+ * is null then configurations with the name specified as a {@link LogicalPlanConfiguration#WILDCARD} are matched.
+ * @param childType The {@link StramElement} corresponding to the type of a child {@link Conf}.
+ * @return The list of child {@link Conf}s with a matching name and {@link StramElement} type.
+ */
private <T extends Conf> List<T> getMatchingChildConf(List<? extends Conf> confs, String name, StramElement childType) {
List<T> childConfs = Lists.newArrayList();
for (Conf conf1 : confs) {
@@ -1685,7 +1731,7 @@ public class LogicalPlanConfiguration {
* @param index The current index that the parser is on for processing the property name.
* @param propertyName The original unsplit Apex property name.
* @param propertyValue The value corresponding to the Apex property.
- * @param conf
+ * @param conf The current {@link Conf} to add properties to.
*/
private void parseStramPropertyTokens(String[] keys, int index, String propertyName, String propertyValue, Conf conf) {
if (index < keys.length) {
@@ -1697,104 +1743,141 @@ public class LogicalPlanConfiguration {
if ((element == StramElement.APPLICATION) || (element == StramElement.OPERATOR) || (element == StramElement.STREAM)
|| (element == StramElement.PORT) || (element == StramElement.INPUT_PORT) || (element == StramElement.OUTPUT_PORT)
|| (element == StramElement.TEMPLATE)) {
- if ((index + 1) < keys.length) {
- String name = keys[index+1];
- Conf elConf = addConf(element, name, conf);
- if (elConf != null) {
- parseStramPropertyTokens(keys, index + 2, propertyName, propertyValue, elConf);
- } else {
- LOG.error("Invalid configuration key: {}", propertyName);
- }
- } else {
- LOG.warn("Invalid configuration key: {}", propertyName);
- }
- } else if ((element == StramElement.GATEWAY)) {
- Conf elConf = addConf(element, null, conf);
- if (elConf != null) {
- parseStramPropertyTokens(keys, index+1, propertyName, propertyValue, elConf);
- } else {
- LOG.error("Invalid configuration key: {}", propertyName);
- }
+ parseAppElement(index, keys, element, conf, propertyName, propertyValue);
+ } else if (element == StramElement.GATEWAY) {
+ parseGatewayElement(element, conf, keys, index, propertyName, propertyValue);
} else if ((element == StramElement.ATTR) || ((element == null) && (conf.getDefaultChildElement() == StramElement.ATTR))) {
- String attributeName = AttributeParseUtils.getAttributeName(element, keys, index);
-
- if (element != StramElement.ATTR) {
- String expName = getCompleteKey(keys, 0, index) + KEY_SEPARATOR + StramElement.ATTR.getValue() + KEY_SEPARATOR + attributeName;
- LOG.warn("Referencing the attribute as {} instead of {} is deprecated!", getCompleteKey(keys, 0), expName);
- }
-
- if (conf.getConfElement().getStramElement() == null) {
- conf = addConf(StramElement.APPLICATION, WILDCARD, conf);
- }
-
- if (conf != null) {
- if (AttributeParseUtils.isSimpleAttributeName(attributeName)) {
- //The provided attribute name was a simple name
-
- if (!AttributeParseUtils.ALL_SIMPLE_ATTRIBUTE_NAMES.contains(attributeName)) {
- throw new ValidationException("Invalid attribute reference: " + getCompleteKey(keys, 0));
- }
-
- if (!conf.getConfElement().getAllChildAttributes().contains(attributeName)) {
- throw new ValidationException(attributeName
- + " is not defined for the "
- + conf.getConfElement().getStramElement()
- + " or any of its child configurations.");
- }
-
- if (conf.getConfElement().getAmbiguousAttributes().contains(attributeName)) {
- //If the attribute name is ambiguous at this configuration level we should tell the user.
- LOG.warn("The attribute "
- + attributeName
- + " is ambiguous when specified on an " + conf.getConfElement().getStramElement());
- }
-
- if (conf.getConfElement().getContextAttributes().contains(attributeName)) {
- @SuppressWarnings("unchecked")
- Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(conf.getConfElement().getContextClass()).get(attributeName);
- conf.setAttribute(attr, propertyValue);
- } else {
- AttributeParseUtils.processAllConfsForAttribute(conf, attributeName, propertyValue);
- }
- } else {
- //This is a FQ attribute name
- Class<? extends Context> contextClass = AttributeParseUtils.getContainingContextClass(attributeName);
-
- //Convert to a simple name
- attributeName = AttributeParseUtils.getSimpleAttributeName(attributeName);
-
- if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(attributeName)) {
- throw new ValidationException(attributeName + " is not a valid attribute in " + contextClass.getCanonicalName());
- }
+ parseAttributeElement(element, keys, index, conf, propertyValue, propertyName);
+ } else if ((element == StramElement.PROP) || ((element == null) && (conf.getDefaultChildElement() == StramElement.PROP))) {
+ parsePropertyElement(element, keys, index, conf, propertyValue, propertyName);
+ } else if (element != null) {
+ conf.parseElement(element, keys, index, propertyValue);
+ }
+ }
+ }
- ConfElement confWithAttr = ConfElement.CONTEXT_TO_CONF_ELEMENT.get(contextClass);
+ /**
+ * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing an app element.
+ * @param element The current {@link StramElement} of the property being parsed.
+ * @param keys The keys that the property being parsed was split into.
+ * @param index The current key that the parser is on.
+ * @param propertyValue The value associated with the property being parsed.
+ * @param propertyName The complete unprocessed name of the property being parsed.
+ */
+ private void parseAppElement(int index, String[] keys, StramElement element, Conf conf1, String propertyName, String propertyValue)
+ {
+ if ((index + 1) < keys.length) {
+ String name = keys[index+1];
+ Conf elConf = addConf(element, name, conf1);
+ if (elConf != null) {
+ parseStramPropertyTokens(keys, index + 2, propertyName, propertyValue, elConf);
+ } else {
+ LOG.error("Invalid configuration key: {}", propertyName);
+ }
+ } else {
+ LOG.warn("Invalid configuration key: {}", propertyName);
+ }
+ }
- conf = ConfElement.addConfs(conf, confWithAttr);
+ /**
+ * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing a gateway element.
+ * @param element The current {@link StramElement} of the property being parsed.
+ * @param keys The keys that the property being parsed was split into.
+ * @param index The current key that the parser is on.
+ * @param propertyValue The value associated with the property being parsed.
+ * @param propertyName The complete unprocessed name of the property being parsed.
+ */
+ private void parseGatewayElement(StramElement element, Conf conf1, String[] keys, int index, String propertyName, String propertyValue)
+ {
+ Conf elConf = addConf(element, null, conf1);
+ if (elConf != null) {
+ parseStramPropertyTokens(keys, index+1, propertyName, propertyValue, elConf);
+ } else {
+ LOG.error("Invalid configuration key: {}", propertyName);
+ }
+ }
- @SuppressWarnings("unchecked")
- Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(confWithAttr.getContextClass()).get(attributeName);
- conf.setAttribute(attr, propertyValue);
- }
- } else {
- LOG.error("Invalid configuration key: {}", propertyName);
+ /**
+ * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing an attribute.
+ * @param element The current {@link StramElement} of the property being parsed.
+ * @param keys The keys that the property being parsed was split into.
+ * @param index The current key that the parser is on.
+ * @param conf The current {@link Conf}.
+ * @param propertyValue The value associated with the property being parsed.
+ * @param propertyName The complete unprocessed name of the property being parsed.
+ */
+ private void parseAttributeElement(StramElement element, String[] keys, int index, Conf conf, String propertyValue, String propertyName)
+ {
+ String attributeName = AttributeParseUtils.getAttributeName(element, keys, index);
+ if (element != StramElement.ATTR) {
+ String expName = getCompleteKey(keys, 0, index) + KEY_SEPARATOR + StramElement.ATTR.getValue() + KEY_SEPARATOR + attributeName;
+ LOG.warn("Referencing the attribute as {} instead of {} is deprecated!", getCompleteKey(keys, 0), expName);
+ }
+ if (conf.getConfElement().getStramElement() == null) {
+ conf = addConf(StramElement.APPLICATION, WILDCARD, conf);
+ }
+ if (conf != null) {
+ if (AttributeParseUtils.isSimpleAttributeName(attributeName)) {
+ //The provided attribute name was a simple name
+ if (!AttributeParseUtils.ALL_SIMPLE_ATTRIBUTE_NAMES.contains(attributeName)) {
+ throw new ValidationException("Invalid attribute reference: " + getCompleteKey(keys, 0));
}
- } else if ((element == StramElement.PROP) || ((element == null) && (conf.getDefaultChildElement() == StramElement.PROP))) {
- // Currently opProps are only supported on operators and streams
- // Supporting current implementation where property can be directly specified under operator
- String prop;
- if (element == StramElement.PROP) {
- prop = getCompleteKey(keys, index+1);
- } else {
- prop = getCompleteKey(keys, index);
+ if (!conf.getConfElement().getAllChildAttributes().contains(attributeName)) {
+ throw new ValidationException(attributeName + " is not defined for the " + conf.getConfElement().getStramElement() + " or any of its child configurations.");
+ }
+ if (conf.getConfElement().getAmbiguousAttributes().contains(attributeName)) {
+ //If the attribute name is ambiguous at this configuration level we should tell the user.
+ LOG.warn("The attribute " + attributeName + " is ambiguous when specified on an " + conf.getConfElement().getStramElement());
}
- if (prop != null) {
- conf.setProperty(prop, propertyValue);
+ if (conf.getConfElement().getContextAttributes().contains(attributeName)) {
+ @SuppressWarnings(value = "unchecked")
+ Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(conf.getConfElement().getContextClass()).get(attributeName);
+ conf.setAttribute(attr, propertyValue);
} else {
- LOG.warn("Invalid property specification, no property name specified for {}", propertyName);
+ AttributeParseUtils.processAllConfsForAttribute(conf, attributeName, propertyValue);
}
- } else if (element != null) {
- conf.parseElement(element, keys, index, propertyValue);
+ } else {
+ //This is a FQ attribute name
+ Class<? extends Context> contextClass = AttributeParseUtils.getContainingContextClass(attributeName);
+ //Convert to a simple name
+ attributeName = AttributeParseUtils.getSimpleAttributeName(attributeName);
+ if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(attributeName)) {
+ throw new ValidationException(attributeName + " is not a valid attribute in " + contextClass.getCanonicalName());
+ }
+ ConfElement confWithAttr = ConfElement.CONTEXT_TO_CONF_ELEMENT.get(contextClass);
+ conf = ConfElement.addConfs(conf, confWithAttr);
+ @SuppressWarnings("unchecked")
+ Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(confWithAttr.getContextClass()).get(attributeName);
+ conf.setAttribute(attr, propertyValue);
}
+ } else {
+ LOG.error("Invalid configuration key: {}", propertyName);
+ }
+ }
+
+ /**
+ * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing a prop.
+ * @param element The current {@link StramElement} of the property being parsed.
+ * @param keys The keys that the property being parsed was split into.
+ * @param index The current key that the parser is on.
+ * @param conf The current {@link Conf}.
+ * @param propertyValue The value associated with the property being parsed.
+ * @param propertyName The complete unprocessed name of the property being parsed.
+ */
+ private void parsePropertyElement(StramElement element, String[] keys, int index, Conf conf, String propertyValue, String propertyName)
+ {
+ // Currently opProps are only supported on operators and streams
+ // Supporting current implementation where property can be directly specified under operator
+ String prop;
+ if (element == StramElement.PROP) {
+ prop = getCompleteKey(keys, index+1);
+ } else {
+ prop = getCompleteKey(keys, index);
+ }
+ if (prop != null) {
+ conf.setProperty(prop, propertyValue);
+ } else {
+ LOG.warn("Invalid property specification, no property name specified for {}", propertyName);
}
}
@@ -1831,7 +1914,12 @@ public class LogicalPlanConfiguration {
* @return The completed key.
*/
private static String getCompleteKey(String[] keys, int start, int end) {
- StringBuilder sb = new StringBuilder(1024);
+ int length = 0;
+ for (int keyIndex = 0; keyIndex < keys.length; keyIndex++) {
+ length += keys[keyIndex].length();
+ }
+
+ StringBuilder sb = new StringBuilder(length);
for (int i = start; i < end; ++i) {
if (i > start) {
sb.append(KEY_SEPARATOR);