You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 08:21:33 UTC

[14/50] incubator-apex-core git commit: APEXCORE-144, APEXCORE-145 Rest api changes to view module information

APEXCORE-144, APEXCORE-145 Rest api changes to view module information


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/47b3ce8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/47b3ce8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/47b3ce8e

Branch: refs/heads/master
Commit: 47b3ce8e4019f325c2e91968cb120763f18bb174
Parents: c1314ea
Author: shubham <sh...@github.com>
Authored: Tue Nov 17 12:10:08 2015 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Tue Dec 22 12:03:55 2015 +0530

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        |  20 +++
 .../java/com/datatorrent/stram/cli/DTCli.java   |   8 +-
 .../stram/codec/LogicalPlanSerializer.java      |  47 ++++++-
 .../stram/plan/logical/LogicalPlan.java         |   7 +-
 .../plan/logical/LogicalPlanConfiguration.java  |   4 +-
 .../stram/webapp/StramWebServices.java          | 123 +++++++++++++------
 6 files changed, 158 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 29c6a2c..162245b 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -95,6 +95,7 @@ import com.datatorrent.stram.engine.WindowGenerator;
 import com.datatorrent.stram.plan.logical.LogicalOperatorStatus;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
@@ -2246,6 +2247,25 @@ public class StreamingContainerManager implements PlanContext
     return fillLogicalOperatorInfo(operatorMeta);
   }
 
+  public ModuleMeta getModuleMeta(String moduleName)
+  {
+    return getModuleMeta(moduleName, getLogicalPlan());
+  }
+
+  private ModuleMeta getModuleMeta(String moduleName, LogicalPlan dag)
+  {
+    for (ModuleMeta m : dag.getAllModules()) {
+      if (m.getFullName().equals(moduleName)) {
+        return m;
+      }
+      ModuleMeta res = getModuleMeta(moduleName, m.getDag());
+      if (res != null) {
+        return res;
+      }
+    }
+    return null;
+  }
+
   public List<LogicalOperatorInfo> getLogicalOperatorInfoList()
   {
     List<LogicalOperatorInfo> infoList = new ArrayList<LogicalOperatorInfo>();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
index deb0967..696f497 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
@@ -2824,7 +2824,7 @@ public class DTCli
               }
               LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
               map.put("applicationName", appFactory.getName());
-              map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan));
+              map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false));
             } finally {
               if (raw) {
                 System.setOut(originalStream);
@@ -2840,7 +2840,7 @@ public class DTCli
             LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
             Map<String, Object> map = new HashMap<String, Object>();
             map.put("applicationName", appFactory.getName());
-            map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan));
+            map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false));
             printJson(map);
           } else if (filename.endsWith(".properties")) {
             File file = new File(filename);
@@ -2849,7 +2849,7 @@ public class DTCli
             LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
             Map<String, Object> map = new HashMap<String, Object>();
             map.put("applicationName", appFactory.getName());
-            map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan));
+            map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false));
             printJson(map);
           } else {
             StramAppLauncher submitApp = getStramAppLauncher(filename, config, commandLineInfo.ignorePom);
@@ -2893,7 +2893,7 @@ public class DTCli
               Map<String, Object> map = new HashMap<String, Object>();
               map.put("applicationName", appInfo.name);
               if (appInfo.dag != null) {
-                map.put("logicalPlan", LogicalPlanSerializer.convertToMap(appInfo.dag));
+                map.put("logicalPlan", LogicalPlanSerializer.convertToMap(appInfo.dag, false));
               }
               if (appInfo.error != null) {
                 map.put("error", appInfo.error);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
index 90dd2b5..7b61d5b 100644
--- a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
+++ b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
@@ -88,7 +88,7 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan>
    * @param dag
    * @return
    */
-  public static Map<String, Object> convertToMap(LogicalPlan dag)
+  public static Map<String, Object> convertToMap(LogicalPlan dag, boolean includeModules)
   {
     HashMap<String, Object> result = new HashMap<String, Object>();
     ArrayList<Object> operatorArray = new ArrayList< Object>();
@@ -200,6 +200,15 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan>
         streamDetailMap.put("locality", streamMeta.getLocality().name());
       }
     }
+
+    if (includeModules) {
+      ArrayList<Map<String, Object>> modulesArray = new ArrayList<>();
+      result.put("modules", modulesArray);
+      for(LogicalPlan.ModuleMeta meta : dag.getAllModules()) {
+        modulesArray.add(getLogicalModuleDetails(dag, meta));
+      }
+    }
+
     return result;
   }
 
@@ -323,13 +332,43 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan>
 
   public static JSONObject convertToJsonObject(LogicalPlan dag)
   {
-    return new JSONObject(convertToMap(dag));
+    return new JSONObject(convertToMap(dag, false));
   }
 
   @Override
-  public void serialize(LogicalPlan dag, JsonGenerator jg, SerializerProvider sp) throws IOException, JsonProcessingException
+  public void serialize(LogicalPlan dag, JsonGenerator jg, SerializerProvider sp) throws IOException,
+      JsonProcessingException
   {
-    jg.writeObject(convertToMap(dag));
+    jg.writeObject(convertToMap(dag, false));
+  }
+
+  /**
+   * Return information about operators and inner modules of a module.
+   *
+   * @param dag        top level DAG
+   * @param moduleMeta module information. DAG within module is used for constructing response.
+   * @return
+   */
+  private static Map<String, Object> getLogicalModuleDetails(LogicalPlan dag, LogicalPlan.ModuleMeta moduleMeta)
+  {
+    Map<String, Object> moduleDetailMap = new HashMap<String, Object>();
+    ArrayList<String> operatorArray = new ArrayList<>();
+    moduleDetailMap.put("name", moduleMeta.getName());
+    moduleDetailMap.put("className", moduleMeta.getModule().getClass().getName());
+
+    moduleDetailMap.put("operators", operatorArray);
+    for (OperatorMeta operatorMeta : moduleMeta.getDag().getAllOperators()) {
+      if (operatorMeta.getModuleName() == null) {
+        String fullName = moduleMeta.getFullName() + LogicalPlan.MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
+        operatorArray.add(fullName);
+      }
+    }
+    ArrayList<Map<String, Object>> modulesArray = new ArrayList<>();
+    moduleDetailMap.put("modules", modulesArray);
+    for (LogicalPlan.ModuleMeta meta : moduleMeta.getDag().getAllModules()) {
+      modulesArray.add(getLogicalModuleDetails(dag, meta));
+    }
+    return moduleDetailMap;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 21039cc..377fa6d 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -32,6 +32,7 @@ import javax.validation.constraints.NotNull;
 
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.slf4j.Logger;
@@ -1090,9 +1091,9 @@ public class LogicalPlan implements Serializable, DAG
     }
 
     // Avoid name conflict with module.
-    if (modules.containsKey(name))
+    if (modules.containsKey(name)) {
       throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
-
+    }
     OperatorMeta decl = new OperatorMeta(name, operator);
     rootOperators.add(decl); // will be removed when a sink is added to an input port for this operator
     operators.put(name, decl);
@@ -1193,7 +1194,7 @@ public class LogicalPlan implements Serializable, DAG
      * @param parentDAG parent dag to populate with operators from this and inner modules.
      * @param conf      configuration object.
      */
-    public void flattenModule(LogicalPlan parentDAG, org.apache.hadoop.conf.Configuration conf)
+    public void flattenModule(LogicalPlan parentDAG, Configuration conf)
     {
       module.populateDAG(dag, conf);
       for (ModuleMeta subModuleMeta : dag.getAllModules()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 483576a..dbd3bc3 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -18,6 +18,7 @@
  */
 package com.datatorrent.stram.plan.logical;
 
+
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,6 +32,7 @@ import java.lang.reflect.Type;
 import java.util.*;
 import java.util.Map.Entry;
 
+
 import javax.validation.ValidationException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -45,7 +47,6 @@ import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.commons.beanutils.BeanMap;
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.collections.CollectionUtils;
@@ -60,6 +61,7 @@ import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+
 import com.datatorrent.stram.StramUtils;
 import com.datatorrent.stram.client.StramClientUtils;
 import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
index 6fdba00..fd47d35 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
@@ -68,6 +68,7 @@ import com.datatorrent.stram.StreamingContainerManager;
 import com.datatorrent.stram.StringCodecs;
 import com.datatorrent.stram.codec.LogicalPlanSerializer;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
 import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
@@ -561,17 +562,14 @@ public class StramWebServices
         LOG.debug("Setting property for {}: {}={}", operatorName, key, val);
         dagManager.setOperatorProperty(operatorName, key, val);
       }
-    }
-    catch (JSONException ex) {
+    } catch (JSONException ex) {
       LOG.warn("Got JSON Exception: ", ex);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       LOG.error("Caught exception: ", ex);
       throw new RuntimeException(ex);
     }
     return response;
   }
-
   @POST // not supported by WebAppProxyServlet, can only be called directly
   @Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{operatorId:\\d+}/properties")
   @Consumes(MediaType.APPLICATION_JSON)
@@ -633,60 +631,97 @@ public class StramWebServices
   public JSONObject getPorts(@PathParam("operatorName") String operatorName)
   {
     OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
+    Set<LogicalPlan.InputPortMeta> inputPorts;
+    Set<LogicalPlan.OutputPortMeta> outputPorts;
     if (logicalOperator == null) {
-      throw new NotFoundException();
+      ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
+      if (logicalModule == null) {
+        throw new NotFoundException();
+      }
+      inputPorts = logicalModule.getInputStreams().keySet();
+      outputPorts = logicalModule.getOutputStreams().keySet();
+    } else {
+      inputPorts = logicalOperator.getInputStreams().keySet();
+      outputPorts = logicalOperator.getOutputStreams().keySet();
     }
+
+    JSONObject result = getPortsObjects(inputPorts, outputPorts);
+    return result;
+  }
+
+  private JSONObject getPortsObjects(Collection<LogicalPlan.InputPortMeta> inputs, Collection<LogicalPlan.OutputPortMeta> outputs)
+  {
     JSONObject result = new JSONObject();
     JSONArray ports = new JSONArray();
     try {
-      for (LogicalPlan.InputPortMeta inputPort : logicalOperator.getInputStreams().keySet()) {
+      for (LogicalPlan.InputPortMeta inputPort : inputs) {
         JSONObject port = new JSONObject();
         port.put("name", inputPort.getPortName());
         port.put("type", "input");
         ports.put(port);
       }
-      for (LogicalPlan.OutputPortMeta outputPort : logicalOperator.getOutputStreams().keySet()) {
+      for (LogicalPlan.OutputPortMeta outputPort : outputs) {
         JSONObject port = new JSONObject();
         port.put("name", outputPort.getPortName());
         port.put("type", "output");
         ports.put(port);
       }
       result.put("ports", ports);
-    }
-    catch (JSONException ex) {
+    } catch (JSONException ex) {
       throw new RuntimeException(ex);
     }
     return result;
   }
 
+  private JSONObject getPortObject(Collection<LogicalPlan.InputPortMeta> inputs, Collection<LogicalPlan.OutputPortMeta> outputs,
+                                   String portName) throws JSONException
+  {
+    for (LogicalPlan.InputPortMeta inputPort : inputs) {
+      if (inputPort.getPortName().equals(portName)) {
+        JSONObject port = new JSONObject();
+        port.put("name", inputPort.getPortName());
+        port.put("type", "input");
+        return port;
+      }
+    }
+    for (LogicalPlan.OutputPortMeta outputPort : outputs) {
+      if (outputPort.getPortName().equals(portName)) {
+        JSONObject port = new JSONObject();
+        port.put("name", outputPort.getPortName());
+        port.put("type", "output");
+        return port;
+      }
+    }
+    return null;
+  }
+
+
   @GET
   @Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/ports/{portName}")
   @Produces(MediaType.APPLICATION_JSON)
   public JSONObject getPort(@PathParam("operatorName") String operatorName, @PathParam("portName") String portName)
   {
     OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
+    Set<LogicalPlan.InputPortMeta> inputPorts;
+    Set<LogicalPlan.OutputPortMeta> outputPorts;
     if (logicalOperator == null) {
-      throw new NotFoundException();
+      ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
+      if (logicalModule == null) {
+        throw new NotFoundException();
+      }
+      inputPorts = logicalModule.getInputStreams().keySet();
+      outputPorts = logicalModule.getOutputStreams().keySet();
+    } else {
+      inputPorts = logicalOperator.getInputStreams().keySet();
+      outputPorts = logicalOperator.getOutputStreams().keySet();
     }
+
     try {
-      for (LogicalPlan.InputPortMeta inputPort : logicalOperator.getInputStreams().keySet()) {
-        if (portName.equals(portName)) {
-          JSONObject port = new JSONObject();
-          port.put("name", inputPort.getPortName());
-          port.put("type", "input");
-          return port;
-        }
-      }
-      for (LogicalPlan.OutputPortMeta outputPort : logicalOperator.getOutputStreams().keySet()) {
-        if (portName.equals(portName)) {
-          JSONObject port = new JSONObject();
-          port.put("name", outputPort.getPortName());
-          port.put("type", "output");
-          return port;
-        }
+      JSONObject resp = getPortObject(inputPorts, outputPorts, portName);
+      if (resp != null) {
+        return resp;
       }
-    }
-    catch (JSONException ex) {
+    } catch (JSONException ex) {
       throw new RuntimeException(ex);
     }
     throw new NotFoundException();
@@ -711,31 +746,41 @@ public class StramWebServices
   {
     init();
     OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
+    BeanMap operatorProperties = null;
     if (logicalOperator == null) {
-      throw new NotFoundException();
+      ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
+      if (logicalModule == null) {
+        throw new NotFoundException();
+      }
+      operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getModule());
+    } else {
+      operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
     }
 
-    BeanMap operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
+    Map<String, Object> m = getPropertiesAsMap(propertyName, operatorProperties);
+    return new JSONObject(objectMapper.writeValueAsString(m));
+  }
+
+  private Map<String, Object> getPropertiesAsMap(@QueryParam("propertyName") String propertyName, BeanMap operatorProperties)
+  {
     Map<String, Object> m = new HashMap<String, Object>();
     @SuppressWarnings("rawtypes")
     Iterator entryIterator = operatorProperties.entryIterator();
     while (entryIterator.hasNext()) {
       try {
         @SuppressWarnings("unchecked")
-        Map.Entry<String, Object> entry = (Map.Entry<String, Object>)entryIterator.next();
+        Entry<String, Object> entry = (Entry<String, Object>)entryIterator.next();
         if (propertyName == null) {
           m.put(entry.getKey(), entry.getValue());
-        }
-        else if (propertyName.equals(entry.getKey())) {
+        } else if (propertyName.equals(entry.getKey())) {
           m.put(entry.getKey(), entry.getValue());
           break;
         }
-      }
-      catch (Exception ex) {
+      } catch (Exception ex) {
         LOG.warn("Caught exception", ex);
       }
     }
-    return new JSONObject(objectMapper.writeValueAsString(m));
+    return m;
   }
 
   @GET
@@ -765,10 +810,10 @@ public class StramWebServices
   @GET
   @Path(PATH_LOGICAL_PLAN)
   @Produces(MediaType.APPLICATION_JSON)
-  public JSONObject getLogicalPlan() throws JSONException, IOException
+  public JSONObject getLogicalPlan(@QueryParam("includeModules") String includeModules) throws JSONException, IOException
   {
-    LogicalPlan lp = dagManager.getLogicalPlan();
-    return new JSONObject(objectMapper.writeValueAsString(LogicalPlanSerializer.convertToMap(lp)));
+    return new JSONObject(objectMapper.writeValueAsString(LogicalPlanSerializer.convertToMap(
+        dagManager.getLogicalPlan(), includeModules != null)));
   }
 
   @POST // not supported by WebAppProxyServlet, can only be called directly