You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:12:40 UTC
[09/50] incubator-apex-core git commit: Schema Support
Schema Support
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/61929b58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/61929b58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/61929b58
Branch: refs/heads/master
Commit: 61929b58f9dbf32c281c845197445e728fffa866
Parents: 66a75e0
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Sun Aug 2 13:29:16 2015 -0700
Committer: Chandni Singh <ch...@datatorrent.com>
Committed: Wed Aug 5 13:13:05 2015 -0700
----------------------------------------------------------------------
.../main/java/com/datatorrent/api/Context.java | 7 +
.../annotation/InputPortFieldAnnotation.java | 10 +-
.../annotation/OutputPortFieldAnnotation.java | 10 +
.../java/com/datatorrent/stram/cli/DTCli.java | 15 +-
.../stram/plan/logical/LogicalPlan.java | 15 ++
.../plan/logical/LogicalPlanConfiguration.java | 26 +-
.../stram/webapp/OperatorDiscoverer.java | 235 +++++++++++++------
.../com/datatorrent/stram/webapp/TypeGraph.java | 28 +++
.../plan/LogicalPlanConfigurationTest.java | 65 ++++-
.../stram/plan/SchemaTestOperator.java | 33 +++
.../stram/webapp/OperatorDiscoveryTest.java | 64 ++++-
.../src/test/resources/schemaTestTopology.json | 43 ++++
12 files changed, 473 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 1417389..249cecd 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -151,6 +151,13 @@ public interface Context
* a generic codec.
*/
Attribute<StreamCodec<?>> STREAM_CODEC = new Attribute<StreamCodec<?>>(new Object2String<StreamCodec<?>>());
+
+ /**
+ * Provides the tuple class which the port receives or emits. While this attribute is null by default,
+ * whether it is needed or not is controlled through the port annotation.
+ */
+ Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Object2String<Class<?>>());
+
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java
index 965eab3..2734bf6 100644
--- a/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java
+++ b/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java
@@ -16,8 +16,8 @@
package com.datatorrent.api.annotation;
import java.lang.annotation.*;
+
/**
- *
* Annotation for input ports on streaming operators.<p>
*
* @since 0.3.2
@@ -33,4 +33,12 @@ public @interface InputPortFieldAnnotation
* @return - true if port is optional, false otherwise.
*/
public boolean optional() default false;
+
+ /**
+ * Whether this port needs to know the tuple class. When true, application will have to set
+ * the port attribute- TUPLE_CLASS of the port otherwise dag validation will fail.
+ *
+ * @return true if schema is required; false otherwise.
+ */
+ public boolean schemaRequired() default false;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java
index 154c1df..bb585c6 100644
--- a/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java
+++ b/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java
@@ -21,6 +21,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import com.datatorrent.api.Context;
/**
*
@@ -40,4 +41,13 @@ public @interface OutputPortFieldAnnotation {
* <p>error.</p>
*/
public boolean error() default false;
+
+ /**
+ * Whether this port needs to know the tuple class. When true, application will have to set
+ * the port attribute- TUPLE_CLASS of the port otherwise dag validation will fail.
+ *
+ * @return true if schema is required; false otherwise.
+ */
+ public boolean schemaRequired() default false;
}
+
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
index 936ba25..eff2404 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
@@ -3012,6 +3012,8 @@ public class DTCli
JSONObject portClassHier = new JSONObject();
JSONObject failed = new JSONObject();
+ JSONObject portTypesWithSchemaClasses = new JSONObject();
+
for (Class<? extends Operator> clazz : operatorClasses) {
try {
JSONObject oper = operatorDiscoverer.describeOperator(clazz);
@@ -3021,8 +3023,15 @@ public class DTCli
String s = defaultValueMapper.writeValueAsString(operIns);
oper.put("defaultValue", new JSONObject(s).get(clazz.getName()));
- // add class hier info to portClassHier
- operatorDiscoverer.buildPortClassHier(oper, portClassHier);
+ // add class hierarchy info to portClassHier and fetch port types with schema classes
+ operatorDiscoverer.buildAdditionalPortInfo(oper, portClassHier, portTypesWithSchemaClasses);
+
+ Iterator portTypesIter = portTypesWithSchemaClasses.keys();
+ while (portTypesIter.hasNext()) {
+ if (!portTypesWithSchemaClasses.getBoolean((String) portTypesIter.next())) {
+ portTypesIter.remove();
+ }
+ }
arr.put(oper);
} catch (Exception | NoClassDefFoundError ex) {
@@ -3031,8 +3040,10 @@ public class DTCli
failed.put(cls, ex.toString());
}
}
+
json.put("operatorClasses", arr);
json.put("portClassHier", portClassHier);
+ json.put("portTypesWithSchemaClasses", portTypesWithSchemaClasses);
if (failed.length() > 0) {
json.put("failedOperators", failed);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index b1e7d94..fc182cd 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1170,6 +1170,13 @@ public class LogicalPlan implements Serializable, DAG
validateThreadLocal(n);
}
}
+
+ if (pm.portAnnotation != null && pm.portAnnotation.schemaRequired()) {
+ //since schema is required, the port attribute TUPLE_CLASS should be present
+ if (pm.attributes.get(PortContext.TUPLE_CLASS) == null) {
+ throw new ValidationException("Attribute " + PortContext.TUPLE_CLASS.getName() + " missing on port : " + n.name + "." + pm.getPortName());
+ }
+ }
}
}
@@ -1179,6 +1186,14 @@ public class LogicalPlan implements Serializable, DAG
if (pm.portAnnotation != null && !pm.portAnnotation.optional()) {
throw new ValidationException("Output port connection required: " + n.name + "." + pm.getPortName());
}
+ } else {
+ //port is connected
+ if (pm.portAnnotation != null && pm.portAnnotation.schemaRequired()) {
+ //since schema is required, the port attribute TUPLE_CLASS should be present
+ if (pm.attributes.get(PortContext.TUPLE_CLASS) == null) {
+ throw new ValidationException("Attribute " + PortContext.TUPLE_CLASS.getName() + " missing on port : " + n.name + "." + pm.getPortName());
+ }
+ }
}
allPortsOptional &= (pm.portAnnotation != null && pm.portAnnotation.optional());
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 3e3326b..d838a2d 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -77,6 +77,7 @@ public class LogicalPlanConfiguration {
public static final String STREAM_SINKS = "sinks";
public static final String STREAM_TEMPLATE = "template";
public static final String STREAM_LOCALITY = "locality";
+ public static final String STREAM_SCHEMA = "schema";
public static final String OPERATOR_PREFIX = StreamingApplication.DT_PREFIX + "operator.";
public static final String OPERATOR_CLASSNAME = "classname";
@@ -908,6 +909,11 @@ public class LogicalPlanConfiguration {
if (locality != null) {
prop.setProperty(streamPrefix + STREAM_LOCALITY, locality);
}
+ JSONObject schema = stream.optJSONObject("schema");
+ if (schema != null) {
+ String schemaClass = schema.getString("class");
+ prop.setProperty(streamPrefix + STREAM_SCHEMA, schemaClass);
+ }
}
return addFromProperties(prop, conf);
}
@@ -1126,6 +1132,16 @@ public class LogicalPlanConfiguration {
DAG.StreamMeta sd = dag.addStream(streamConfEntry.getKey());
sd.setLocality(streamConf.getLocality());
+ String schemaClassName = streamConf.properties.getProperty(STREAM_SCHEMA);
+ Class<?> schemaClass = null;
+ if (schemaClassName != null) {
+ try {
+ schemaClass = Class.forName(schemaClassName);
+ } catch (ClassNotFoundException e) {
+ throw new ValidationException("schema class not found: " + schemaClassName);
+ }
+ }
+
if (streamConf.sourceNode != null) {
String portName = null;
for (Map.Entry<String, StreamConf> e : streamConf.sourceNode.outputs.entrySet()) {
@@ -1137,6 +1153,10 @@ public class LogicalPlanConfiguration {
Operators.PortMappingDescriptor sourcePortMap = new Operators.PortMappingDescriptor();
Operators.describe(sourceDecl, sourcePortMap);
sd.setSource(sourcePortMap.outputPorts.get(portName).component);
+
+ if (schemaClass != null) {
+ dag.setOutputPortAttribute(sourcePortMap.outputPorts.get(portName).component, PortContext.TUPLE_CLASS, schemaClass);
+ }
}
for (OperatorConf targetNode : streamConf.targetNodes) {
@@ -1150,6 +1170,10 @@ public class LogicalPlanConfiguration {
Operators.PortMappingDescriptor targetPortMap = new Operators.PortMappingDescriptor();
Operators.describe(targetDecl, targetPortMap);
sd.addSink(targetPortMap.inputPorts.get(portName).component);
+
+ if (schemaClass != null) {
+ dag.setInputPortAttribute(targetPortMap.inputPorts.get(portName).component, PortContext.TUPLE_CLASS, schemaClass);
+ }
}
}
@@ -1164,7 +1188,7 @@ public class LogicalPlanConfiguration {
*/
public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name)
{
- // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attt below is used
+ // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
String connectAddress = conf.get(StreamingApplication.DT_PREFIX + Context.DAGContext.GATEWAY_CONNECT_ADDRESS.getName());
dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null? conf.get(GATEWAY_LISTEN_ADDRESS): connectAddress);
if (app != null) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java b/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java
index 60e35da..004c100 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java
@@ -22,8 +22,10 @@ import com.datatorrent.stram.webapp.TypeDiscoverer.UI_TYPE;
import com.datatorrent.stram.webapp.asm.CompactAnnotationNode;
import com.datatorrent.stram.webapp.asm.CompactFieldNode;
import com.google.common.base.Predicate;
+import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.beans.*;
@@ -44,7 +46,6 @@ import javax.xml.parsers.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.text.WordUtils;
import org.codehaus.jettison.json.*;
-import org.apache.xbean.asm5.tree.AnnotationNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.Attributes;
@@ -60,6 +61,8 @@ import org.xml.sax.helpers.DefaultHandler;
*/
public class OperatorDiscoverer
{
+ public static final String GENERATED_CLASSES_JAR = "_generated-classes.jar";
+
private static class ClassComparator implements Comparator<Class<?>> {
@Override
@@ -73,21 +76,34 @@ public class OperatorDiscoverer
private static final Logger LOG = LoggerFactory.getLogger(OperatorDiscoverer.class);
private final List<String> pathsToScan = new ArrayList<String>();
private final ClassLoader classLoader;
- private final String dtOperatorDoclinkPrefix = "https://www.datatorrent.com/docs/apidocs/index.html";
+ private static final String DT_OPERATOR_DOCLINK_PREFIX = "https://www.datatorrent.com/docs/apidocs/index.html";
public static final String PORT_TYPE_INFO_KEY = "portTypeInfo";
private final TypeGraph typeGraph = TypeGraphFactory.createTypeGraphProtoType();
+ private static final String USE_SCHEMA_TAG = "@useSchema";
+ private static final String DESCRIPTION_TAG = "@description";
+ private static final Pattern WHITESPACE_PATTERN = Pattern.compile("\\s+?");
+
+ private static final String SCHEMA_REQUIRED_KEY = "schemaRequired";
+
private final Map<String, OperatorClassInfo> classInfo = new HashMap<String, OperatorClassInfo>();
private static class OperatorClassInfo {
String comment;
final Map<String, String> tags = new HashMap<String, String>();
- final Map<String, String> getMethods = new HashMap<String, String>();
- final Map<String, String> setMethods = new HashMap<String, String>();
+ final Map<String, MethodInfo> getMethods = Maps.newHashMap();
+ final Map<String, MethodInfo> setMethods = Maps.newHashMap();
final Set<String> invisibleGetSetMethods = new HashSet<String>();
final Map<String, String> fields = new HashMap<String, String>();
}
+ private static class MethodInfo
+ {
+ Map<String, String> descriptions = Maps.newHashMap();
+ Map<String, String> useSchemas = Maps.newHashMap();
+ String comment;
+ }
+
private class JavadocSAXHandler extends DefaultHandler {
private String className = null;
@@ -122,11 +138,19 @@ public class OperatorDiscoverer
else if (qName.equalsIgnoreCase("tag")) {
if (oci != null) {
String tagName = attributes.getValue("name");
- String tagText = attributes.getValue("text");
+ String tagText = attributes.getValue("text").trim();
if (methodName != null) {
- if("@omitFromUI".equals(tagName) && (isGetter(methodName) || isSetter(methodName)))
- {
- oci.invisibleGetSetMethods.add(methodName);
+ boolean lGetterCheck = isGetter(methodName);
+ boolean lSetterCheck = !lGetterCheck && isSetter(methodName);
+
+ if (lGetterCheck || lSetterCheck) {
+ if ("@omitFromUI".equals(tagName)) {
+ oci.invisibleGetSetMethods.add(methodName);
+ } else if (DESCRIPTION_TAG.equals(tagName)) {
+ addTagToMethod(lGetterCheck ? oci.getMethods : oci.setMethods, tagText, true);
+ } else if (USE_SCHEMA_TAG.equals(tagName)) {
+ addTagToMethod(lGetterCheck ? oci.getMethods : oci.setMethods, tagText, false);
+ }
}
// if ("@return".equals(tagName) && isGetter(methodName)) {
// oci.getMethods.put(methodName, tagText);
@@ -149,6 +173,24 @@ public class OperatorDiscoverer
}
}
+ private void addTagToMethod(Map<String, MethodInfo> methods, String tagText, boolean isDescription)
+ {
+ MethodInfo mi = methods.get(methodName);
+ if (mi == null) {
+ mi = new MethodInfo();
+ methods.put(methodName, mi);
+ }
+ String[] tagParts = Iterables.toArray(Splitter.on(WHITESPACE_PATTERN).trimResults().omitEmptyStrings().
+ limit(2).split(tagText), String.class);
+ if (tagParts.length == 2) {
+ if (isDescription) {
+ mi.descriptions.put(tagParts[0], tagParts[1]);
+ } else {
+ mi.useSchemas.put(tagParts[0], tagParts[1]);
+ }
+ }
+ }
+
@Override
public void endElement(String uri, String localName, String qName) throws SAXException {
if (qName.equalsIgnoreCase("class")) {
@@ -160,9 +202,19 @@ public class OperatorDiscoverer
if (methodName != null) {
// do nothing
if (isGetter(methodName)) {
- oci.getMethods.put(methodName, comment.toString());
+ MethodInfo mi = oci.getMethods.get(methodName);
+ if (mi == null) {
+ mi = new MethodInfo();
+ oci.getMethods.put(methodName, mi);
+ }
+ mi.comment = comment.toString();
} else if (isSetter(methodName)) {
- oci.setMethods.put(methodName, comment.toString());
+ MethodInfo mi = oci.setMethods.get(methodName);
+ if (mi == null) {
+ mi = new MethodInfo();
+ oci.setMethods.put(methodName, mi);
+ }
+ mi.comment = comment.toString();
}
}
else if (fieldName != null) {
@@ -236,7 +288,7 @@ public class OperatorDiscoverer
{
Map<String, JarFile> openJarFiles = new HashMap<String, JarFile>();
Map<String, File> openClassFiles = new HashMap<String, File>();
- try {
+ try {
for (String path : pathsToScan) {
File f = null;
try {
@@ -244,6 +296,9 @@ public class OperatorDiscoverer
if (!f.exists() || f.isDirectory() || (!f.getName().endsWith("jar") && !f.getName().endsWith("class"))) {
continue;
}
+ if (GENERATED_CLASSES_JAR.equals(f.getName())) {
+ continue;
+ }
if (f.getName().endsWith("class")) {
typeGraph.addNode(f);
openClassFiles.put(path, f);
@@ -410,6 +465,9 @@ public class OperatorDiscoverer
if (!inputPort.has("optional")) {
inputPort.put("optional", false); // input port that is not annotated is default to be not optional
}
+ if (!inputPort.has(SCHEMA_REQUIRED_KEY)) {
+ inputPort.put(SCHEMA_REQUIRED_KEY, false);
+ }
inputPorts.put(inputPort);
}
@@ -422,6 +480,9 @@ public class OperatorDiscoverer
if (!outputPort.has("error")) {
outputPort.put("error", false);
}
+ if (!outputPort.has(SCHEMA_REQUIRED_KEY)) {
+ outputPort.put(SCHEMA_REQUIRED_KEY, false);
+ }
outputPorts.put(outputPort);
}
@@ -471,7 +532,7 @@ public class OperatorDiscoverer
}
else if (clazz.getName().startsWith("com.datatorrent.lib.") ||
clazz.getName().startsWith("com.datatorrent.contrib.")) {
- response.put("doclink", dtOperatorDoclinkPrefix + "?" + getDocName(clazz));
+ response.put("doclink", DT_OPERATOR_DOCLINK_PREFIX + "?" + getDocName(clazz));
}
}
}
@@ -531,10 +592,10 @@ public class OperatorDiscoverer
if (oci.invisibleGetSetMethods.contains(getPrefix + propName) || oci.invisibleGetSetMethods.contains(setPrefix + propName)) {
continue;
}
- String desc = oci.setMethods.get(setPrefix + propName);
- desc = desc == null ? oci.getMethods.get(getPrefix + propName) : desc;
- if (desc != null) {
- propJ.put("description", desc);
+ MethodInfo methodInfo = oci.setMethods.get(setPrefix + propName);
+ methodInfo = methodInfo == null ? oci.getMethods.get(getPrefix + propName) : methodInfo;
+ if (methodInfo != null) {
+ addTagsToProperties(methodInfo, propJ);
}
result.put(propJ);
}
@@ -553,6 +614,32 @@ public class OperatorDiscoverer
}
}
+ private void addTagsToProperties(MethodInfo mi, JSONObject propJ) throws JSONException
+ {
+ //create description object. description tag enables the visual tools to display description of keys/values
+ //of a map property, items of a list, properties within a complex type.
+ JSONObject descriptionObj = new JSONObject();
+ if (mi.comment != null) {
+ descriptionObj.put("$", mi.comment);
+ }
+ for (Map.Entry<String, String> descEntry : mi.descriptions.entrySet()) {
+ descriptionObj.put(descEntry.getKey(), descEntry.getValue());
+ }
+ if (descriptionObj.length() > 0) {
+ propJ.put("descriptions", descriptionObj);
+ }
+
+ //create useSchema object. useSchema tag is added to enable visual tools to be able to render a text field
+ //as a dropdown with choices populated from the schema attached to the port.
+ JSONObject useSchemaObj = new JSONObject();
+ for (Map.Entry<String, String> useSchemaEntry : mi.useSchemas.entrySet()) {
+ useSchemaObj.put(useSchemaEntry.getKey(), useSchemaEntry.getValue());
+ }
+ if (useSchemaObj.length() > 0) {
+ propJ.put("useSchema", useSchemaObj);
+ }
+ }
+
public JSONObject describeClass(String clazzName) throws Exception
{
return describeClassByASM(clazzName);
@@ -626,9 +713,9 @@ public class OperatorDiscoverer
for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
OperatorClassInfo oci = classInfo.get(c.getName());
if (oci != null) {
- String getMethodDesc = oci.getMethods.get(readMethod.getName());
- if (getMethodDesc != null) {
- propertyObj.put("description", oci.getMethods.get(readMethod.getName()));
+ MethodInfo getMethodInfo = oci.getMethods.get(readMethod.getName());
+ if (getMethodInfo != null) {
+ addTagsToProperties(getMethodInfo, propertyObj);
break;
}
}
@@ -673,74 +760,79 @@ public class OperatorDiscoverer
/**
* Enrich portClassHier with class/interface names that map to a list of parent classes/interfaces.
- * For any class encountered, find its parents too.
+ * For any class encountered, find its parents too.<br/>
+ * Also find the port types which have assignable schema classes.
*
- * @param oper Operator to work on
- * @param portClassHier In-Out param that contains a mapping of class/interface to its parents
+ * @param oper Operator to work on
+ * @param portClassHierarchy In-Out param that contains a mapping of class/interface to its parents
+ * @param portTypesWithSchemaClasses Json that will contain all the ports which have any schema classes.
*/
- public void buildPortClassHier(JSONObject oper, JSONObject portClassHier) {
+ public void buildAdditionalPortInfo(JSONObject oper, JSONObject portClassHierarchy, JSONObject portTypesWithSchemaClasses)
+ {
try {
JSONArray ports = oper.getJSONArray(OperatorDiscoverer.PORT_TYPE_INFO_KEY);
- int num_ports = ports.length();
- for (int i = 0; i < num_ports; i++) {
+ for (int i = 0; i < ports.length(); i++) {
JSONObject port = ports.getJSONObject(i);
- String type;
- try {
- type = port.getString("type");
- } catch (JSONException e) {
- // no type key
+ String portType = port.optString("type");
+ if (portType == null) {
+ //skipping if port type is null
continue;
}
- try {
- // load the port type class
- Class<?> portClazz = classLoader.loadClass(type.replaceAll("\\bclass ", "").replaceAll("\\binterface ", ""));
-
- // iterate up the class hierarchy to populate the portClassHier map
- while (portClazz != null) {
- ArrayList<String> parents = new ArrayList<String>();
+ if (typeGraph.size() == 0) {
+ buildTypeGraph();
+ }
- String portClazzName = portClazz.toString();
- if (portClassHier.has(portClazzName)) {
- // already present in portClassHier, so we can stop
- break;
+ try {
+ //building port class hierarchy
+ LinkedList<String> queue = Lists.newLinkedList();
+ queue.add(portType);
+ while (!queue.isEmpty()) {
+ String currentType = queue.remove();
+ if (portClassHierarchy.has(currentType)) {
+ //already present in the json so we skip.
+ continue;
}
-
- // interfaces and Object are at the top of the tree, so we can just put them
- // in portClassHier with empty parents, then move on.
- if (portClazz.isInterface() || portClazzName.equals("java.lang.Object")) {
- portClassHier.put(portClazzName, parents);
- break;
+ List<String> immediateParents = typeGraph.getParents(currentType);
+ if (immediateParents == null) {
+ portClassHierarchy.put(currentType, Lists.<String>newArrayList());
+ continue;
}
+ portClassHierarchy.put(currentType, immediateParents);
+ queue.addAll(immediateParents);
+ }
+ } catch (JSONException e) {
+ LOG.warn("building port type hierarchy {}", portType, e);
+ }
- // look at superclass first
- Class<?> superClazz = portClazz.getSuperclass();
- try {
- String superClazzName = superClazz.toString();
- parents.add(superClazzName);
- } catch (NullPointerException e) {
- LOG.info("Superclass is null for `{}` ({})", portClazz, superClazz);
- }
- // then look at interfaces implemented in this port
- for (Class<?> intf : portClazz.getInterfaces()) {
- String intfName = intf.toString();
- if (!portClassHier.has(intfName)) {
- // add the interface to portClassHier
- portClassHier.put(intfName, new ArrayList<String>());
- }
- parents.add(intfName);
+ //finding port types with schema classes
+ if (portTypesWithSchemaClasses.has(portType)) {
+ //already present in the json so skipping
+ continue;
+ }
+ if (portType.equals("byte") || portType.equals("short") || portType.equals("char") || portType.equals("int")
+ || portType.equals("long") || portType.equals("float") || portType.equals("double")
+ || portType.equals("java.lang.String") || portType.equals("java.lang.Object")) {
+ //ignoring primitives, strings and object types as this information is needed only for complex types.
+ continue;
+ }
+ if (port.has("typeArgs")) {
+ //ignoring any type with generics
+ continue;
+ }
+ boolean hasSchemaClasses = false;
+ for (String descendant : typeGraph.getInstantiableDescendants(portType)) {
+ try {
+ if (typeGraph.isInstantiableBean(descendant)) {
+ hasSchemaClasses = true;
+ break;
}
-
- // now store class=>parents mapping in portClassHier
- portClassHier.put(portClazzName, parents);
-
- // walk up the hierarchy for the next iteration
- portClazz = superClazz;
+ } catch (JSONException ex) {
+ LOG.warn("checking descendant is instantiable {}", descendant);
}
- } catch (ClassNotFoundException e) {
- LOG.info("Could not make class from `{}`", type);
}
+ portTypesWithSchemaClasses.put(portType, hasSchemaClasses);
}
} catch (JSONException e) {
// should not reach this
@@ -763,5 +855,4 @@ public class OperatorDiscoverer
return typeGraph;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java b/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java
index b06bb76..61dc99d 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java
@@ -1152,4 +1152,32 @@ public class TypeGraph
return result;
}
+ /**
+ * A utility method that tells whether a class is considered a bean.<br/>
+ * For simplicity we exclude classes that have any type-args.
+ *
+ * @param className name of the class
+ * @return true if it is a bean false otherwise.
+ */
+ public boolean isInstantiableBean(String className) throws JSONException
+ {
+ JSONObject classDesc = describeClass(className);
+ if (classDesc.has("typeArgs")) {
+ //any type with generics is not considered a bean
+ return false;
+ }
+ JSONArray classProps = classDesc.optJSONArray("properties");
+ if (classProps == null || classProps.length() == 0) {
+ //no properties then cannot be a bean
+ return false;
+ }
+ for (int p = 0; p < classProps.length(); p++) {
+ JSONObject propDesc = classProps.getJSONObject(p);
+ if (propDesc.optBoolean("canGet", false)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
index c46fb5b..af12575 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
@@ -21,6 +21,8 @@ import java.io.StringWriter;
import java.lang.reflect.Field;
import java.util.*;
+import javax.validation.ValidationException;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -682,6 +684,63 @@ public class LogicalPlanConfigurationTest {
}
}
+ @Test
+ public void testTupleClassAttr() throws Exception
+ {
+ String resourcePath = "/schemaTestTopology.json";
+ InputStream is = this.getClass().getResourceAsStream(resourcePath);
+ if (is == null) {
+ fail("Could not load " + resourcePath);
+ }
+ StringWriter writer = new StringWriter();
+
+ IOUtils.copy(is, writer);
+ JSONObject json = new JSONObject(writer.toString());
+
+ Configuration conf = new Configuration(false);
+
+ LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
+ LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
+ dag.validate();
+
+ OperatorMeta operator1 = dag.getOperatorMeta("operator1");
+ assertEquals("operator1.classname", SchemaTestOperator.class, operator1.getOperator().getClass());
+
+ StreamMeta input1 = dag.getStream("inputStream");
+ assertNotNull(input1);
+ for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
+ Assert.assertEquals("tuple class name required", TestSchema.class, targetPort.getAttributes().get(PortContext.TUPLE_CLASS));
+ }
+ }
+
+ @Test
+ public void testTupleClassAttrValidation() throws Exception
+ {
+ String resourcePath = "/schemaTestTopology.json";
+ InputStream is = this.getClass().getResourceAsStream(resourcePath);
+ if (is == null) {
+ fail("Could not load " + resourcePath);
+ }
+ StringWriter writer = new StringWriter();
+
+ IOUtils.copy(is, writer);
+ JSONObject json = new JSONObject(writer.toString());
+
+ //removing schema so that validation fails
+ json.getJSONArray("streams").getJSONObject(0).remove("schema");
+ Configuration conf = new Configuration(false);
+
+ LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
+ LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
+
+ try {
+ dag.validate();
+ Assert.fail();
+ } catch (ValidationException ve) {
+ //test pass as validation exception was thrown.
+ }
+ }
+
private static final Logger logger = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class);
public static class TestApplication implements StreamingApplication {
@@ -789,7 +848,11 @@ public class LogicalPlanConfigurationTest {
return false;
return true;
}
-
+
+ }
+
+ public static class TestSchema
+ {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/test/java/com/datatorrent/stram/plan/SchemaTestOperator.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/SchemaTestOperator.java b/engine/src/test/java/com/datatorrent/stram/plan/SchemaTestOperator.java
new file mode 100644
index 0000000..59aaade
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/SchemaTestOperator.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.stram.plan;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
+import com.datatorrent.stram.engine.GenericTestOperator;
+
+public class SchemaTestOperator extends GenericTestOperator
+{
+ @InputPortFieldAnnotation(schemaRequired = true)
+ final public transient InputPort<Object> schemaRequiredPort = new DefaultInputPort<Object>()
+ {
+ @Override
+ final public void process(Object payload)
+ {
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
index 8f8b632..ad915c8 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
@@ -41,6 +41,8 @@ import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.InputOperator;
+
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
@@ -170,7 +172,7 @@ public class OperatorDiscoveryTest
OperatorDiscoverer operatorDiscoverer = new OperatorDiscoverer(classFilePath);
operatorDiscoverer.buildTypeGraph();
- // make sure (de)serialization of type graph works withtout problem
+ // make sure (de)serialization of type graph works without problem
Kryo kryo = new Kryo();
TypeGraph.TypeGraphSerializer tgs = new TypeGraph.TypeGraphSerializer();
kryo.register(TypeGraph.class, tgs);
@@ -1033,4 +1035,64 @@ public class OperatorDiscoveryTest
}
+ public static class SchemaRequiredOperator extends BaseOperator implements InputOperator
+ {
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+
+ @OutputPortFieldAnnotation(schemaRequired = false)
+ public final transient DefaultOutputPort<Object> output1 = new DefaultOutputPort<Object>();
+
+ public final transient DefaultOutputPort<Object> output2 = new DefaultOutputPort<Object>();
+
+ @Override
+ public void emitTuples()
+ {
+ }
+ }
+
+ @Test
+ public void testPortSchema() throws Exception
+ {
+ String[] classFilePath = getClassFileInClasspath();
+ OperatorDiscoverer od = new OperatorDiscoverer(classFilePath);
+ od.buildTypeGraph();
+ JSONObject operatorJson = od.describeOperator(SchemaRequiredOperator.class);
+ JSONArray portsJson = operatorJson.getJSONArray("outputPorts");
+
+ Assert.assertEquals("no. of ports", 3, portsJson.length());
+
+ for (int i = 0; i < portsJson.length(); i++) {
+ JSONObject portJson = portsJson.getJSONObject(i);
+ String name = portJson.getString("name");
+ if (name.equals("output")) {
+ Assert.assertEquals("output schema", true, portJson.getBoolean("schemaRequired"));
+ } else if (name.equals("output1")) {
+ Assert.assertEquals("output1 schema", false, portJson.getBoolean("schemaRequired"));
+ } else if (name.equals("output2")) {
+ Assert.assertEquals("output2 schema", false, portJson.getBoolean("schemaRequired"));
+ }
+ }
+ }
+
+ @Test
+ public void testAdditionalPortInfo() throws Exception
+ {
+ String[] classFilePath = getClassFileInClasspath();
+ OperatorDiscoverer operatorDiscoverer = new OperatorDiscoverer(classFilePath);
+ operatorDiscoverer.buildTypeGraph();
+ JSONObject operator = operatorDiscoverer.describeOperator(SubSubClassGeneric.class);
+
+ JSONObject portClassHierarchy = new JSONObject();
+ JSONObject portsWithSchemaClasses = new JSONObject();
+ operatorDiscoverer.buildAdditionalPortInfo(operator, portClassHierarchy, portsWithSchemaClasses);
+
+ JSONArray stringTypeArray = portClassHierarchy.optJSONArray("java.lang.String");
+ Assert.assertNotNull("string hierarchy", stringTypeArray);
+
+ Assert.assertEquals("number of immediate ancestors", 4, stringTypeArray.length());
+
+ Assert.assertEquals("number of port types with schema", 0, portsWithSchemaClasses.length());
+ }
}
+
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/test/resources/schemaTestTopology.json
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/schemaTestTopology.json b/engine/src/test/resources/schemaTestTopology.json
new file mode 100644
index 0000000..6c779fd
--- /dev/null
+++ b/engine/src/test/resources/schemaTestTopology.json
@@ -0,0 +1,43 @@
+{
+ "operators": [
+ {
+ "name": "inputOperator",
+ "class": "com.datatorrent.stram.engine.TestGeneratorInputOperator",
+ "properties": {
+ "com.datatorrent.stram.engine.TestGeneratorInputOperator": {
+ "myConfigProperty": "myConfigPropertyValue"
+ }
+ },
+ "ports": [
+ {
+ "name": "outport",
+ "attributes": {
+ "UNIFIER_LIMIT": 8
+ }
+ }
+ ]
+ },
+ {
+ "name": "operator1",
+ "class": "com.datatorrent.stram.plan.SchemaTestOperator"
+ }
+ ],
+ "streams": [
+ {
+ "name": "inputStream",
+ "source": {
+ "operatorName": "inputOperator",
+ "portName": "outport"
+ },
+ "sinks": [
+ {
+ "operatorName": "operator1",
+ "portName": "schemaRequiredPort"
+ }
+ ],
+ "schema": {
+ "class": "com.datatorrent.stram.plan.LogicalPlanConfigurationTest$TestSchema"
+ }
+ }
+ ]
+}