You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:12:40 UTC

[09/50] incubator-apex-core git commit: Schema Support

Schema Support


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/61929b58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/61929b58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/61929b58

Branch: refs/heads/master
Commit: 61929b58f9dbf32c281c845197445e728fffa866
Parents: 66a75e0
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Sun Aug 2 13:29:16 2015 -0700
Committer: Chandni Singh <ch...@datatorrent.com>
Committed: Wed Aug 5 13:13:05 2015 -0700

----------------------------------------------------------------------
 .../main/java/com/datatorrent/api/Context.java  |   7 +
 .../annotation/InputPortFieldAnnotation.java    |  10 +-
 .../annotation/OutputPortFieldAnnotation.java   |  10 +
 .../java/com/datatorrent/stram/cli/DTCli.java   |  15 +-
 .../stram/plan/logical/LogicalPlan.java         |  15 ++
 .../plan/logical/LogicalPlanConfiguration.java  |  26 +-
 .../stram/webapp/OperatorDiscoverer.java        | 235 +++++++++++++------
 .../com/datatorrent/stram/webapp/TypeGraph.java |  28 +++
 .../plan/LogicalPlanConfigurationTest.java      |  65 ++++-
 .../stram/plan/SchemaTestOperator.java          |  33 +++
 .../stram/webapp/OperatorDiscoveryTest.java     |  64 ++++-
 .../src/test/resources/schemaTestTopology.json  |  43 ++++
 12 files changed, 473 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 1417389..249cecd 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -151,6 +151,13 @@ public interface Context
      * a generic codec.
      */
     Attribute<StreamCodec<?>> STREAM_CODEC = new Attribute<StreamCodec<?>>(new Object2String<StreamCodec<?>>());
+
+    /**
+     * Provides the tuple class which the port receives or emits. While this attribute is null by default,
+     * whether it is needed or not is controlled through the port annotation.
+     */
+    Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Object2String<Class<?>>());
+
     @SuppressWarnings("FieldNameHidesFieldInSuperclass")
     long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java
index 965eab3..2734bf6 100644
--- a/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java
+++ b/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java
@@ -16,8 +16,8 @@
 package com.datatorrent.api.annotation;
 
 import java.lang.annotation.*;
+
 /**
- *
  * Annotation for input ports on streaming operators.<p>
  *
  * @since 0.3.2
@@ -33,4 +33,12 @@ public @interface InputPortFieldAnnotation
    * @return - true if port is optional, false otherwise.
    */
   public boolean optional() default false;
+
+  /**
+   * Whether this port needs to know the tuple class. When true, application will have to set
+   * the port attribute- TUPLE_CLASS of the port otherwise dag validation will fail.
+   *
+   * @return true if schema is required; false otherwise.
+   */
+  public boolean schemaRequired() default false;
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java
index 154c1df..bb585c6 100644
--- a/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java
+++ b/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java
@@ -21,6 +21,7 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
+import com.datatorrent.api.Context;
 
 /**
  *
@@ -40,4 +41,13 @@ public @interface OutputPortFieldAnnotation {
    * <p>error.</p>
    */
   public boolean error() default false;
+
+  /**
+   * Whether this port needs to know the tuple class. When true, application will have to set
+   * the port attribute- TUPLE_CLASS of the port otherwise dag validation will fail.
+   *
+   * @return  true if schema is required; false otherwise.
+   */
+  public boolean schemaRequired() default false;
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
index 936ba25..eff2404 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
@@ -3012,6 +3012,8 @@ public class DTCli
         JSONObject portClassHier = new JSONObject();
 
         JSONObject failed = new JSONObject();
+        JSONObject portTypesWithSchemaClasses = new JSONObject();
+
         for (Class<? extends Operator> clazz : operatorClasses) {
           try {
             JSONObject oper = operatorDiscoverer.describeOperator(clazz);
@@ -3021,8 +3023,15 @@ public class DTCli
             String s = defaultValueMapper.writeValueAsString(operIns);
             oper.put("defaultValue", new JSONObject(s).get(clazz.getName()));
             
-            // add class hier info to portClassHier
-            operatorDiscoverer.buildPortClassHier(oper, portClassHier);
+            // add class hierarchy info to portClassHier and fetch port types with schema classes
+            operatorDiscoverer.buildAdditionalPortInfo(oper, portClassHier, portTypesWithSchemaClasses);
+
+            Iterator portTypesIter = portTypesWithSchemaClasses.keys();
+            while (portTypesIter.hasNext()) {
+              if (!portTypesWithSchemaClasses.getBoolean((String) portTypesIter.next())) {
+                portTypesIter.remove();
+              }
+            }
 
             arr.put(oper);
           } catch (Exception | NoClassDefFoundError ex) {
@@ -3031,8 +3040,10 @@ public class DTCli
             failed.put(cls, ex.toString());
           }
         }
+
         json.put("operatorClasses", arr);
         json.put("portClassHier", portClassHier);
+        json.put("portTypesWithSchemaClasses", portTypesWithSchemaClasses);
         if (failed.length() > 0) {
           json.put("failedOperators", failed);
         }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index b1e7d94..fc182cd 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1170,6 +1170,13 @@ public class LogicalPlan implements Serializable, DAG
               validateThreadLocal(n);
             }
           }
+
+          if (pm.portAnnotation != null && pm.portAnnotation.schemaRequired()) {
+            //since schema is required, the port attribute TUPLE_CLASS should be present
+            if (pm.attributes.get(PortContext.TUPLE_CLASS) == null) {
+              throw new ValidationException("Attribute " + PortContext.TUPLE_CLASS.getName() + " missing on port : " + n.name + "." + pm.getPortName());
+            }
+          }
         }
       }
 
@@ -1179,6 +1186,14 @@ public class LogicalPlan implements Serializable, DAG
           if (pm.portAnnotation != null && !pm.portAnnotation.optional()) {
             throw new ValidationException("Output port connection required: " + n.name + "." + pm.getPortName());
           }
+        } else {
+          //port is connected
+          if (pm.portAnnotation != null && pm.portAnnotation.schemaRequired()) {
+            //since schema is required, the port attribute TUPLE_CLASS should be present
+            if (pm.attributes.get(PortContext.TUPLE_CLASS) == null) {
+              throw new ValidationException("Attribute " + PortContext.TUPLE_CLASS.getName() + " missing on port : " + n.name + "." + pm.getPortName());
+            }
+          }
         }
         allPortsOptional &= (pm.portAnnotation != null && pm.portAnnotation.optional());
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 3e3326b..d838a2d 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -77,6 +77,7 @@ public class LogicalPlanConfiguration {
   public static final String STREAM_SINKS = "sinks";
   public static final String STREAM_TEMPLATE = "template";
   public static final String STREAM_LOCALITY = "locality";
+  public static final String STREAM_SCHEMA = "schema";
 
   public static final String OPERATOR_PREFIX =  StreamingApplication.DT_PREFIX + "operator.";
   public static final String OPERATOR_CLASSNAME = "classname";
@@ -908,6 +909,11 @@ public class LogicalPlanConfiguration {
       if (locality != null) {
         prop.setProperty(streamPrefix + STREAM_LOCALITY, locality);
       }
+      JSONObject schema = stream.optJSONObject("schema");
+      if (schema != null) {
+        String schemaClass = schema.getString("class");
+        prop.setProperty(streamPrefix + STREAM_SCHEMA, schemaClass);
+      }
     }
     return addFromProperties(prop, conf);
   }
@@ -1126,6 +1132,16 @@ public class LogicalPlanConfiguration {
       DAG.StreamMeta sd = dag.addStream(streamConfEntry.getKey());
       sd.setLocality(streamConf.getLocality());
 
+      String schemaClassName = streamConf.properties.getProperty(STREAM_SCHEMA);
+      Class<?> schemaClass = null;
+      if (schemaClassName != null) {
+        try {
+          schemaClass = Class.forName(schemaClassName);
+        } catch (ClassNotFoundException e) {
+          throw new ValidationException("schema class not found: " + schemaClassName);
+        }
+      }
+
       if (streamConf.sourceNode != null) {
         String portName = null;
         for (Map.Entry<String, StreamConf> e : streamConf.sourceNode.outputs.entrySet()) {
@@ -1137,6 +1153,10 @@ public class LogicalPlanConfiguration {
         Operators.PortMappingDescriptor sourcePortMap = new Operators.PortMappingDescriptor();
         Operators.describe(sourceDecl, sourcePortMap);
         sd.setSource(sourcePortMap.outputPorts.get(portName).component);
+
+        if (schemaClass != null) {
+          dag.setOutputPortAttribute(sourcePortMap.outputPorts.get(portName).component, PortContext.TUPLE_CLASS, schemaClass);
+        }
       }
 
       for (OperatorConf targetNode : streamConf.targetNodes) {
@@ -1150,6 +1170,10 @@ public class LogicalPlanConfiguration {
         Operators.PortMappingDescriptor targetPortMap = new Operators.PortMappingDescriptor();
         Operators.describe(targetDecl, targetPortMap);
         sd.addSink(targetPortMap.inputPorts.get(portName).component);
+
+        if (schemaClass != null) {
+          dag.setInputPortAttribute(targetPortMap.inputPorts.get(portName).component, PortContext.TUPLE_CLASS, schemaClass);
+        }
       }
     }
 
@@ -1164,7 +1188,7 @@ public class LogicalPlanConfiguration {
    */
   public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name)
   {
-    // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attt below is used 
+    // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
     String connectAddress = conf.get(StreamingApplication.DT_PREFIX + Context.DAGContext.GATEWAY_CONNECT_ADDRESS.getName());
     dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null? conf.get(GATEWAY_LISTEN_ADDRESS): connectAddress);
     if (app != null) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java b/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java
index 60e35da..004c100 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java
@@ -22,8 +22,10 @@ import com.datatorrent.stram.webapp.TypeDiscoverer.UI_TYPE;
 import com.datatorrent.stram.webapp.asm.CompactAnnotationNode;
 import com.datatorrent.stram.webapp.asm.CompactFieldNode;
 import com.google.common.base.Predicate;
+import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import java.beans.*;
@@ -44,7 +46,6 @@ import javax.xml.parsers.*;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.text.WordUtils;
 import org.codehaus.jettison.json.*;
-import org.apache.xbean.asm5.tree.AnnotationNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.Attributes;
@@ -60,6 +61,8 @@ import org.xml.sax.helpers.DefaultHandler;
  */
 public class OperatorDiscoverer
 {
+  public static final String GENERATED_CLASSES_JAR = "_generated-classes.jar";
+
   private static class ClassComparator implements Comparator<Class<?>> {
 
     @Override
@@ -73,21 +76,34 @@ public class OperatorDiscoverer
   private static final Logger LOG = LoggerFactory.getLogger(OperatorDiscoverer.class);
   private final List<String> pathsToScan = new ArrayList<String>();
   private final ClassLoader classLoader;
-  private final String dtOperatorDoclinkPrefix = "https://www.datatorrent.com/docs/apidocs/index.html";
+  private static final String DT_OPERATOR_DOCLINK_PREFIX = "https://www.datatorrent.com/docs/apidocs/index.html";
   public static final String PORT_TYPE_INFO_KEY = "portTypeInfo";
   private final TypeGraph typeGraph = TypeGraphFactory.createTypeGraphProtoType();
 
+  private static final String USE_SCHEMA_TAG = "@useSchema";
+  private static final String DESCRIPTION_TAG = "@description";
+  private static final Pattern WHITESPACE_PATTERN = Pattern.compile("\\s+?");
+
+  private static final String SCHEMA_REQUIRED_KEY = "schemaRequired";
+
   private final Map<String, OperatorClassInfo> classInfo = new HashMap<String, OperatorClassInfo>();
 
   private static class OperatorClassInfo {
     String comment;
     final Map<String, String> tags = new HashMap<String, String>();
-    final Map<String, String> getMethods = new HashMap<String, String>();
-    final Map<String, String> setMethods = new HashMap<String, String>();
+    final Map<String, MethodInfo> getMethods = Maps.newHashMap();
+    final Map<String, MethodInfo> setMethods = Maps.newHashMap();
     final Set<String> invisibleGetSetMethods = new HashSet<String>();
     final Map<String, String> fields = new HashMap<String, String>();
   }
 
+  private static class MethodInfo
+  {
+    Map<String, String> descriptions = Maps.newHashMap();
+    Map<String, String> useSchemas = Maps.newHashMap();
+    String comment;
+  }
+
   private class JavadocSAXHandler extends DefaultHandler {
 
     private String className = null;
@@ -122,11 +138,19 @@ public class OperatorDiscoverer
       else if (qName.equalsIgnoreCase("tag")) {
         if (oci != null) {
           String tagName = attributes.getValue("name");
-          String tagText = attributes.getValue("text");
+          String tagText = attributes.getValue("text").trim();
           if (methodName != null) {
-            if("@omitFromUI".equals(tagName) && (isGetter(methodName) || isSetter(methodName)))
-            {
-              oci.invisibleGetSetMethods.add(methodName);
+            boolean lGetterCheck = isGetter(methodName);
+            boolean lSetterCheck = !lGetterCheck && isSetter(methodName);
+
+            if (lGetterCheck || lSetterCheck) {
+              if ("@omitFromUI".equals(tagName)) {
+                oci.invisibleGetSetMethods.add(methodName);
+              } else if (DESCRIPTION_TAG.equals(tagName)) {
+                addTagToMethod(lGetterCheck ? oci.getMethods : oci.setMethods, tagText, true);
+              } else if (USE_SCHEMA_TAG.equals(tagName)) {
+                addTagToMethod(lGetterCheck ? oci.getMethods : oci.setMethods, tagText, false);
+              }
             }
 //            if ("@return".equals(tagName) && isGetter(methodName)) {
 //              oci.getMethods.put(methodName, tagText);
@@ -149,6 +173,24 @@ public class OperatorDiscoverer
       }
     }
 
+    private void addTagToMethod(Map<String, MethodInfo> methods, String tagText, boolean isDescription)
+    {
+      MethodInfo mi = methods.get(methodName);
+      if (mi == null) {
+        mi = new MethodInfo();
+        methods.put(methodName, mi);
+      }
+      String[] tagParts = Iterables.toArray(Splitter.on(WHITESPACE_PATTERN).trimResults().omitEmptyStrings().
+        limit(2).split(tagText), String.class);
+      if (tagParts.length == 2) {
+        if (isDescription) {
+          mi.descriptions.put(tagParts[0], tagParts[1]);
+        } else {
+          mi.useSchemas.put(tagParts[0], tagParts[1]);
+        }
+      }
+    }
+
     @Override
     public void endElement(String uri, String localName, String qName) throws SAXException {
       if (qName.equalsIgnoreCase("class")) {
@@ -160,9 +202,19 @@ public class OperatorDiscoverer
         if (methodName != null) {
           // do nothing
           if (isGetter(methodName)) {
-            oci.getMethods.put(methodName, comment.toString());
+            MethodInfo mi = oci.getMethods.get(methodName);
+            if (mi == null) {
+              mi = new MethodInfo();
+              oci.getMethods.put(methodName, mi);
+            }
+            mi.comment = comment.toString();
           } else if (isSetter(methodName)) {
-            oci.setMethods.put(methodName, comment.toString());
+            MethodInfo mi = oci.setMethods.get(methodName);
+            if (mi == null) {
+              mi = new MethodInfo();
+              oci.setMethods.put(methodName, mi);
+            }
+            mi.comment = comment.toString();
           }
         }
         else if (fieldName != null) {
@@ -236,7 +288,7 @@ public class OperatorDiscoverer
   {
     Map<String, JarFile> openJarFiles = new HashMap<String, JarFile>();
     Map<String, File> openClassFiles = new HashMap<String, File>();
-    try { 
+    try {
       for (String path : pathsToScan) {
         File f = null;
         try {
@@ -244,6 +296,9 @@ public class OperatorDiscoverer
           if (!f.exists() || f.isDirectory() || (!f.getName().endsWith("jar") && !f.getName().endsWith("class"))) {
             continue;
           }
+          if (GENERATED_CLASSES_JAR.equals(f.getName())) {
+            continue;
+          }
           if (f.getName().endsWith("class")) {
             typeGraph.addNode(f);
             openClassFiles.put(path, f);
@@ -410,6 +465,9 @@ public class OperatorDiscoverer
           if (!inputPort.has("optional")) {
             inputPort.put("optional", false); // input port that is not annotated is default to be not optional
           }
+          if (!inputPort.has(SCHEMA_REQUIRED_KEY)) {
+            inputPort.put(SCHEMA_REQUIRED_KEY, false);
+          }
           inputPorts.put(inputPort);
         }
 
@@ -422,6 +480,9 @@ public class OperatorDiscoverer
           if (!outputPort.has("error")) {
             outputPort.put("error", false);
           }
+          if (!outputPort.has(SCHEMA_REQUIRED_KEY)) {
+            outputPort.put(SCHEMA_REQUIRED_KEY, false);
+          }
           outputPorts.put(outputPort);
         }
 
@@ -471,7 +532,7 @@ public class OperatorDiscoverer
           }
           else if (clazz.getName().startsWith("com.datatorrent.lib.") ||
                   clazz.getName().startsWith("com.datatorrent.contrib.")) {
-            response.put("doclink", dtOperatorDoclinkPrefix + "?" + getDocName(clazz));
+            response.put("doclink", DT_OPERATOR_DOCLINK_PREFIX + "?" + getDocName(clazz));
           }
         }
       }
@@ -531,10 +592,10 @@ public class OperatorDiscoverer
       if (oci.invisibleGetSetMethods.contains(getPrefix + propName) || oci.invisibleGetSetMethods.contains(setPrefix + propName)) {
         continue;
       }
-      String desc = oci.setMethods.get(setPrefix + propName);
-      desc = desc == null ? oci.getMethods.get(getPrefix + propName) : desc;
-      if (desc != null) {
-        propJ.put("description", desc);
+      MethodInfo methodInfo = oci.setMethods.get(setPrefix + propName);
+      methodInfo = methodInfo == null ? oci.getMethods.get(getPrefix + propName) : methodInfo;
+      if (methodInfo != null) {
+        addTagsToProperties(methodInfo, propJ);
       }
       result.put(propJ);
     }
@@ -553,6 +614,32 @@ public class OperatorDiscoverer
     }
   }
 
+  private void addTagsToProperties(MethodInfo mi, JSONObject propJ) throws JSONException
+  {
+    //create description object. description tag enables the visual tools to display description of keys/values
+    //of a map property, items of a list, properties within a complex type.
+    JSONObject descriptionObj = new JSONObject();
+    if (mi.comment != null) {
+      descriptionObj.put("$", mi.comment);
+    }
+    for (Map.Entry<String, String> descEntry : mi.descriptions.entrySet()) {
+      descriptionObj.put(descEntry.getKey(), descEntry.getValue());
+    }
+    if (descriptionObj.length() > 0) {
+      propJ.put("descriptions", descriptionObj);
+    }
+
+    //create useSchema object. useSchema tag is added to enable visual tools to be able to render a text field
+    //as a dropdown with choices populated from the schema attached to the port.
+    JSONObject useSchemaObj = new JSONObject();
+    for (Map.Entry<String, String> useSchemaEntry : mi.useSchemas.entrySet()) {
+      useSchemaObj.put(useSchemaEntry.getKey(), useSchemaEntry.getValue());
+    }
+    if (useSchemaObj.length() > 0) {
+      propJ.put("useSchema", useSchemaObj);
+    }
+  }
+
   public JSONObject describeClass(String clazzName) throws Exception
   {
     return describeClassByASM(clazzName);
@@ -626,9 +713,9 @@ public class OperatorDiscoverer
               for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
                 OperatorClassInfo oci = classInfo.get(c.getName());
                 if (oci != null) {
-                  String getMethodDesc = oci.getMethods.get(readMethod.getName());
-                  if (getMethodDesc != null) {
-                    propertyObj.put("description", oci.getMethods.get(readMethod.getName()));
+                  MethodInfo getMethodInfo = oci.getMethods.get(readMethod.getName());
+                  if (getMethodInfo != null) {
+                    addTagsToProperties(getMethodInfo, propertyObj);
                     break;
                   }
                 }
@@ -673,74 +760,79 @@ public class OperatorDiscoverer
 
   /**
    * Enrich portClassHier with class/interface names that map to a list of parent classes/interfaces.
-   * For any class encountered, find its parents too.
+   * For any class encountered, find its parents too.<br/>
+   * Also find the port types which have assignable schema classes.
    *
-   * @param oper Operator to work on
-   * @param portClassHier In-Out param that contains a mapping of class/interface to its parents
+   * @param oper                       Operator to work on
+   * @param portClassHierarchy         In-Out param that contains a mapping of class/interface to its parents
+   * @param portTypesWithSchemaClasses Json that will contain all the ports which have any schema classes.
    */
-  public void buildPortClassHier(JSONObject oper, JSONObject portClassHier) {
+  public void buildAdditionalPortInfo(JSONObject oper, JSONObject portClassHierarchy, JSONObject portTypesWithSchemaClasses)
+  {
     try {
       JSONArray ports = oper.getJSONArray(OperatorDiscoverer.PORT_TYPE_INFO_KEY);
-      int num_ports = ports.length();
-      for (int i = 0; i < num_ports; i++) {
+      for (int i = 0; i < ports.length(); i++) {
         JSONObject port = ports.getJSONObject(i);
 
-        String type;
-        try {
-          type = port.getString("type");
-        } catch (JSONException e) {
-          // no type key
+        String portType = port.optString("type");
+        if (portType == null) {
+          //skipping if port type is null
           continue;
         }
 
-        try {
-          // load the port type class
-          Class<?> portClazz = classLoader.loadClass(type.replaceAll("\\bclass ", "").replaceAll("\\binterface ", ""));
-
-          // iterate up the class hierarchy to populate the portClassHier map
-          while (portClazz != null) {
-            ArrayList<String> parents = new ArrayList<String>();
+        if (typeGraph.size() == 0) {
+          buildTypeGraph();
+        }
 
-            String portClazzName = portClazz.toString();
-            if (portClassHier.has(portClazzName)) {
-              // already present in portClassHier, so we can stop
-              break;
+        try {
+          //building port class hierarchy
+          LinkedList<String> queue = Lists.newLinkedList();
+          queue.add(portType);
+          while (!queue.isEmpty()) {
+            String currentType = queue.remove();
+            if (portClassHierarchy.has(currentType)) {
+              //already present in the json so we skip.
+              continue;
             }
-
-            // interfaces and Object are at the top of the tree, so we can just put them
-            // in portClassHier with empty parents, then move on.
-            if (portClazz.isInterface() || portClazzName.equals("java.lang.Object")) {
-              portClassHier.put(portClazzName, parents);
-              break;
+            List<String> immediateParents = typeGraph.getParents(currentType);
+            if (immediateParents == null) {
+              portClassHierarchy.put(currentType, Lists.<String>newArrayList());
+              continue;
             }
+            portClassHierarchy.put(currentType, immediateParents);
+            queue.addAll(immediateParents);
+          }
+        } catch (JSONException e) {
+          LOG.warn("building port type hierarchy {}", portType, e);
+        }
 
-            // look at superclass first
-            Class<?> superClazz = portClazz.getSuperclass();
-            try {
-              String superClazzName = superClazz.toString();
-              parents.add(superClazzName);
-            } catch (NullPointerException e) {
-              LOG.info("Superclass is null for `{}` ({})", portClazz, superClazz);
-            }
-            // then look at interfaces implemented in this port
-            for (Class<?> intf : portClazz.getInterfaces()) {
-              String intfName = intf.toString();
-              if (!portClassHier.has(intfName)) {
-                // add the interface to portClassHier
-                portClassHier.put(intfName, new ArrayList<String>());
-              }
-              parents.add(intfName);
+        //finding port types with schema classes
+        if (portTypesWithSchemaClasses.has(portType)) {
+          //already present in the json so skipping
+          continue;
+        }
+        if (portType.equals("byte") || portType.equals("short") || portType.equals("char") || portType.equals("int")
+          || portType.equals("long") || portType.equals("float") || portType.equals("double")
+          || portType.equals("java.lang.String") || portType.equals("java.lang.Object")) {
+          //ignoring primitives, strings and object types as this information is needed only for complex types.
+          continue;
+        }
+        if (port.has("typeArgs")) {
+          //ignoring any type with generics
+          continue;
+        }
+        boolean hasSchemaClasses = false;
+        for (String descendant : typeGraph.getInstantiableDescendants(portType)) {
+          try {
+            if (typeGraph.isInstantiableBean(descendant)) {
+              hasSchemaClasses = true;
+              break;
             }
-
-            // now store class=>parents mapping in portClassHier
-            portClassHier.put(portClazzName, parents);
-
-            // walk up the hierarchy for the next iteration
-            portClazz = superClazz;
+          } catch (JSONException ex) {
+            LOG.warn("checking descendant is instantiable {}", descendant);
           }
-        } catch (ClassNotFoundException e) {
-          LOG.info("Could not make class from `{}`", type);
         }
+        portTypesWithSchemaClasses.put(portType, hasSchemaClasses);
       }
     } catch (JSONException e) {
       // should not reach this
@@ -763,5 +855,4 @@ public class OperatorDiscoverer
     return typeGraph;
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java b/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java
index b06bb76..61dc99d 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java
@@ -1152,4 +1152,32 @@ public class TypeGraph
     return result;
   }
 
+  /**
+   * A utility method that tells whether a class is considered a bean.<br/>
+   * For simplicity we exclude classes that have any type-args.
+   *
+   * @param className name of the class
+   * @return true if it is a bean false otherwise.
+   */
+  public boolean isInstantiableBean(String className) throws JSONException
+  {
+    JSONObject classDesc = describeClass(className);
+    if (classDesc.has("typeArgs")) {
+      //any type with generics is not considered a bean
+      return false;
+    }
+    JSONArray classProps = classDesc.optJSONArray("properties");
+    if (classProps == null || classProps.length() == 0) {
+      //no properties then cannot be a bean
+      return false;
+    }
+    for (int p = 0; p < classProps.length(); p++) {
+      JSONObject propDesc = classProps.getJSONObject(p);
+      if (propDesc.optBoolean("canGet", false)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
index c46fb5b..af12575 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
@@ -21,6 +21,8 @@ import java.io.StringWriter;
 import java.lang.reflect.Field;
 import java.util.*;
 
+import javax.validation.ValidationException;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -682,6 +684,63 @@ public class LogicalPlanConfigurationTest {
     }
   }
 
+  @Test
+  public void testTupleClassAttr() throws Exception
+  {
+    String resourcePath = "/schemaTestTopology.json";
+    InputStream is = this.getClass().getResourceAsStream(resourcePath);
+    if (is == null) {
+      fail("Could not load " + resourcePath);
+    }
+    StringWriter writer = new StringWriter();
+
+    IOUtils.copy(is, writer);
+    JSONObject json = new JSONObject(writer.toString());
+
+    Configuration conf = new Configuration(false);
+
+    LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
+    LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
+    dag.validate();
+
+    OperatorMeta operator1 = dag.getOperatorMeta("operator1");
+    assertEquals("operator1.classname", SchemaTestOperator.class, operator1.getOperator().getClass());
+
+    StreamMeta input1 = dag.getStream("inputStream");
+    assertNotNull(input1);
+    for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
+      Assert.assertEquals("tuple class name required", TestSchema.class, targetPort.getAttributes().get(PortContext.TUPLE_CLASS));
+    }
+  }
+
+  @Test
+  public void testTupleClassAttrValidation() throws Exception
+  {
+    String resourcePath = "/schemaTestTopology.json";
+    InputStream is = this.getClass().getResourceAsStream(resourcePath);
+    if (is == null) {
+      fail("Could not load " + resourcePath);
+    }
+    StringWriter writer = new StringWriter();
+
+    IOUtils.copy(is, writer);
+    JSONObject json = new JSONObject(writer.toString());
+
+    //removing schema so that validation fails
+    json.getJSONArray("streams").getJSONObject(0).remove("schema");
+    Configuration conf = new Configuration(false);
+
+    LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
+    LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
+
+    try {
+      dag.validate();
+      Assert.fail();
+    } catch (ValidationException ve) {
+      //test pass as validation exception was thrown.
+    }
+  }
+
   private static final Logger logger = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class);
 
   public static class TestApplication implements StreamingApplication {
@@ -789,7 +848,11 @@ public class LogicalPlanConfigurationTest {
         return false;
       return true;
     }
-    
+
+  }
+
+  public static class TestSchema
+  {
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/test/java/com/datatorrent/stram/plan/SchemaTestOperator.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/SchemaTestOperator.java b/engine/src/test/java/com/datatorrent/stram/plan/SchemaTestOperator.java
new file mode 100644
index 0000000..59aaade
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/SchemaTestOperator.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.stram.plan;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
+import com.datatorrent.stram.engine.GenericTestOperator;
+
+public class SchemaTestOperator extends GenericTestOperator
+{
+  @InputPortFieldAnnotation(schemaRequired = true)
+  final public transient InputPort<Object> schemaRequiredPort = new DefaultInputPort<Object>()
+  {
+    @Override
+    final public void process(Object payload)
+    {
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
index 8f8b632..ad915c8 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
@@ -41,6 +41,8 @@ import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.InputOperator;
+
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
@@ -170,7 +172,7 @@ public class OperatorDiscoveryTest
     OperatorDiscoverer operatorDiscoverer = new OperatorDiscoverer(classFilePath);
     operatorDiscoverer.buildTypeGraph();
 
-    // make sure (de)serialization of type graph works withtout problem
+    // make sure (de)serialization of type graph works without problem
     Kryo kryo = new Kryo();
     TypeGraph.TypeGraphSerializer tgs = new TypeGraph.TypeGraphSerializer();
     kryo.register(TypeGraph.class, tgs);
@@ -1033,4 +1035,64 @@ public class OperatorDiscoveryTest
     
   }
 
+  public static class SchemaRequiredOperator extends BaseOperator implements InputOperator
+  {
+    @OutputPortFieldAnnotation(schemaRequired = true)
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+
+    @OutputPortFieldAnnotation(schemaRequired = false)
+    public final transient DefaultOutputPort<Object> output1 = new DefaultOutputPort<Object>();
+
+    public final transient DefaultOutputPort<Object> output2 = new DefaultOutputPort<Object>();
+
+    @Override
+    public void emitTuples()
+    {
+    }
+  }
+
+  @Test
+  public void testPortSchema() throws Exception
+  {
+    String[] classFilePath = getClassFileInClasspath();
+    OperatorDiscoverer od = new OperatorDiscoverer(classFilePath);
+    od.buildTypeGraph();
+    JSONObject operatorJson = od.describeOperator(SchemaRequiredOperator.class);
+    JSONArray portsJson = operatorJson.getJSONArray("outputPorts");
+
+    Assert.assertEquals("no. of ports", 3, portsJson.length());
+
+    for (int i = 0; i < portsJson.length(); i++) {
+      JSONObject portJson = portsJson.getJSONObject(i);
+      String name = portJson.getString("name");
+      if (name.equals("output")) {
+        Assert.assertEquals("output schema", true, portJson.getBoolean("schemaRequired"));
+      } else if (name.equals("output1")) {
+        Assert.assertEquals("output1 schema", false, portJson.getBoolean("schemaRequired"));
+      } else if (name.equals("output2")) {
+        Assert.assertEquals("output2 schema", false, portJson.getBoolean("schemaRequired"));
+      }
+    }
+  }
+
+  @Test
+  public void testAdditionalPortInfo() throws Exception
+  {
+    String[] classFilePath = getClassFileInClasspath();
+    OperatorDiscoverer operatorDiscoverer = new OperatorDiscoverer(classFilePath);
+    operatorDiscoverer.buildTypeGraph();
+    JSONObject operator = operatorDiscoverer.describeOperator(SubSubClassGeneric.class);
+
+    JSONObject portClassHierarchy = new JSONObject();
+    JSONObject portsWithSchemaClasses = new JSONObject();
+    operatorDiscoverer.buildAdditionalPortInfo(operator, portClassHierarchy, portsWithSchemaClasses);
+
+    JSONArray stringTypeArray = portClassHierarchy.optJSONArray("java.lang.String");
+    Assert.assertNotNull("string hierarchy", stringTypeArray);
+
+    Assert.assertEquals("number of immediate ancestors", 4, stringTypeArray.length());
+
+    Assert.assertEquals("number of port types with schema", 0, portsWithSchemaClasses.length());
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/test/resources/schemaTestTopology.json
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/schemaTestTopology.json b/engine/src/test/resources/schemaTestTopology.json
new file mode 100644
index 0000000..6c779fd
--- /dev/null
+++ b/engine/src/test/resources/schemaTestTopology.json
@@ -0,0 +1,43 @@
+{
+  "operators": [
+    {
+      "name": "inputOperator",
+      "class": "com.datatorrent.stram.engine.TestGeneratorInputOperator",
+      "properties": {
+        "com.datatorrent.stram.engine.TestGeneratorInputOperator": {
+          "myConfigProperty": "myConfigPropertyValue"
+        }
+      },
+      "ports": [
+        {
+          "name": "outport",
+          "attributes": {
+            "UNIFIER_LIMIT": 8
+          }
+        }
+      ]
+    },
+    {
+      "name": "operator1",
+      "class": "com.datatorrent.stram.plan.SchemaTestOperator"
+    }
+  ],
+  "streams": [
+    {
+      "name": "inputStream",
+      "source": {
+        "operatorName": "inputOperator",
+        "portName": "outport"
+      },
+      "sinks": [
+        {
+          "operatorName": "operator1",
+          "portName": "schemaRequiredPort"
+        }
+      ],
+      "schema": {
+        "class": "com.datatorrent.stram.plan.LogicalPlanConfigurationTest$TestSchema"
+      }
+    }
+  ]
+}