You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 08:21:33 UTC
[14/50] incubator-apex-core git commit: APEXCORE-144,
APEXCORE-145 Rest api changes to view module information
APEXCORE-144, APEXCORE-145 Rest api changes to view module information
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/47b3ce8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/47b3ce8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/47b3ce8e
Branch: refs/heads/master
Commit: 47b3ce8e4019f325c2e91968cb120763f18bb174
Parents: c1314ea
Author: shubham <sh...@github.com>
Authored: Tue Nov 17 12:10:08 2015 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Tue Dec 22 12:03:55 2015 +0530
----------------------------------------------------------------------
.../stram/StreamingContainerManager.java | 20 +++
.../java/com/datatorrent/stram/cli/DTCli.java | 8 +-
.../stram/codec/LogicalPlanSerializer.java | 47 ++++++-
.../stram/plan/logical/LogicalPlan.java | 7 +-
.../plan/logical/LogicalPlanConfiguration.java | 4 +-
.../stram/webapp/StramWebServices.java | 123 +++++++++++++------
6 files changed, 158 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 29c6a2c..162245b 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -95,6 +95,7 @@ import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.plan.logical.LogicalOperatorStatus;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
@@ -2246,6 +2247,25 @@ public class StreamingContainerManager implements PlanContext
return fillLogicalOperatorInfo(operatorMeta);
}
+ public ModuleMeta getModuleMeta(String moduleName)
+ {
+ return getModuleMeta(moduleName, getLogicalPlan());
+ }
+
+ private ModuleMeta getModuleMeta(String moduleName, LogicalPlan dag)
+ {
+ for (ModuleMeta m : dag.getAllModules()) {
+ if (m.getFullName().equals(moduleName)) {
+ return m;
+ }
+ ModuleMeta res = getModuleMeta(moduleName, m.getDag());
+ if (res != null) {
+ return res;
+ }
+ }
+ return null;
+ }
+
public List<LogicalOperatorInfo> getLogicalOperatorInfoList()
{
List<LogicalOperatorInfo> infoList = new ArrayList<LogicalOperatorInfo>();
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
index deb0967..696f497 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
@@ -2824,7 +2824,7 @@ public class DTCli
}
LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
map.put("applicationName", appFactory.getName());
- map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan));
+ map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false));
} finally {
if (raw) {
System.setOut(originalStream);
@@ -2840,7 +2840,7 @@ public class DTCli
LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
Map<String, Object> map = new HashMap<String, Object>();
map.put("applicationName", appFactory.getName());
- map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan));
+ map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false));
printJson(map);
} else if (filename.endsWith(".properties")) {
File file = new File(filename);
@@ -2849,7 +2849,7 @@ public class DTCli
LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
Map<String, Object> map = new HashMap<String, Object>();
map.put("applicationName", appFactory.getName());
- map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan));
+ map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false));
printJson(map);
} else {
StramAppLauncher submitApp = getStramAppLauncher(filename, config, commandLineInfo.ignorePom);
@@ -2893,7 +2893,7 @@ public class DTCli
Map<String, Object> map = new HashMap<String, Object>();
map.put("applicationName", appInfo.name);
if (appInfo.dag != null) {
- map.put("logicalPlan", LogicalPlanSerializer.convertToMap(appInfo.dag));
+ map.put("logicalPlan", LogicalPlanSerializer.convertToMap(appInfo.dag, false));
}
if (appInfo.error != null) {
map.put("error", appInfo.error);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
index 90dd2b5..7b61d5b 100644
--- a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
+++ b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
@@ -88,7 +88,7 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan>
* @param dag
* @return
*/
- public static Map<String, Object> convertToMap(LogicalPlan dag)
+ public static Map<String, Object> convertToMap(LogicalPlan dag, boolean includeModules)
{
HashMap<String, Object> result = new HashMap<String, Object>();
ArrayList<Object> operatorArray = new ArrayList< Object>();
@@ -200,6 +200,15 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan>
streamDetailMap.put("locality", streamMeta.getLocality().name());
}
}
+
+ if (includeModules) {
+ ArrayList<Map<String, Object>> modulesArray = new ArrayList<>();
+ result.put("modules", modulesArray);
+ for(LogicalPlan.ModuleMeta meta : dag.getAllModules()) {
+ modulesArray.add(getLogicalModuleDetails(dag, meta));
+ }
+ }
+
return result;
}
@@ -323,13 +332,43 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan>
public static JSONObject convertToJsonObject(LogicalPlan dag)
{
- return new JSONObject(convertToMap(dag));
+ return new JSONObject(convertToMap(dag, false));
}
@Override
- public void serialize(LogicalPlan dag, JsonGenerator jg, SerializerProvider sp) throws IOException, JsonProcessingException
+ public void serialize(LogicalPlan dag, JsonGenerator jg, SerializerProvider sp) throws IOException,
+ JsonProcessingException
{
- jg.writeObject(convertToMap(dag));
+ jg.writeObject(convertToMap(dag, false));
+ }
+
+ /**
+ * Return information about operators and inner modules of a module.
+ *
+ * @param dag top level DAG
+ * @param moduleMeta module information. DAG within module is used for constructing response.
+ * @return
+ */
+ private static Map<String, Object> getLogicalModuleDetails(LogicalPlan dag, LogicalPlan.ModuleMeta moduleMeta)
+ {
+ Map<String, Object> moduleDetailMap = new HashMap<String, Object>();
+ ArrayList<String> operatorArray = new ArrayList<>();
+ moduleDetailMap.put("name", moduleMeta.getName());
+ moduleDetailMap.put("className", moduleMeta.getModule().getClass().getName());
+
+ moduleDetailMap.put("operators", operatorArray);
+ for (OperatorMeta operatorMeta : moduleMeta.getDag().getAllOperators()) {
+ if (operatorMeta.getModuleName() == null) {
+ String fullName = moduleMeta.getFullName() + LogicalPlan.MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
+ operatorArray.add(fullName);
+ }
+ }
+ ArrayList<Map<String, Object>> modulesArray = new ArrayList<>();
+ moduleDetailMap.put("modules", modulesArray);
+ for (LogicalPlan.ModuleMeta meta : moduleMeta.getDag().getAllModules()) {
+ modulesArray.add(getLogicalModuleDetails(dag, meta));
+ }
+ return moduleDetailMap;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 21039cc..377fa6d 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -32,6 +32,7 @@ import javax.validation.constraints.NotNull;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
@@ -1090,9 +1091,9 @@ public class LogicalPlan implements Serializable, DAG
}
// Avoid name conflict with module.
- if (modules.containsKey(name))
+ if (modules.containsKey(name)) {
throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
-
+ }
OperatorMeta decl = new OperatorMeta(name, operator);
rootOperators.add(decl); // will be removed when a sink is added to an input port for this operator
operators.put(name, decl);
@@ -1193,7 +1194,7 @@ public class LogicalPlan implements Serializable, DAG
* @param parentDAG parent dag to populate with operators from this and inner modules.
* @param conf configuration object.
*/
- public void flattenModule(LogicalPlan parentDAG, org.apache.hadoop.conf.Configuration conf)
+ public void flattenModule(LogicalPlan parentDAG, Configuration conf)
{
module.populateDAG(dag, conf);
for (ModuleMeta subModuleMeta : dag.getAllModules()) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 483576a..dbd3bc3 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -18,6 +18,7 @@
*/
package com.datatorrent.stram.plan.logical;
+
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -31,6 +32,7 @@ import java.lang.reflect.Type;
import java.util.*;
import java.util.Map.Entry;
+
import javax.validation.ValidationException;
import com.google.common.annotations.VisibleForTesting;
@@ -45,7 +47,6 @@ import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections.CollectionUtils;
@@ -60,6 +61,7 @@ import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+
import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
index 6fdba00..fd47d35 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
@@ -68,6 +68,7 @@ import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.StringCodecs;
import com.datatorrent.stram.codec.LogicalPlanSerializer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
@@ -561,17 +562,14 @@ public class StramWebServices
LOG.debug("Setting property for {}: {}={}", operatorName, key, val);
dagManager.setOperatorProperty(operatorName, key, val);
}
- }
- catch (JSONException ex) {
+ } catch (JSONException ex) {
LOG.warn("Got JSON Exception: ", ex);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
LOG.error("Caught exception: ", ex);
throw new RuntimeException(ex);
}
return response;
}
-
@POST // not supported by WebAppProxyServlet, can only be called directly
@Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{operatorId:\\d+}/properties")
@Consumes(MediaType.APPLICATION_JSON)
@@ -633,60 +631,97 @@ public class StramWebServices
public JSONObject getPorts(@PathParam("operatorName") String operatorName)
{
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
+ Set<LogicalPlan.InputPortMeta> inputPorts;
+ Set<LogicalPlan.OutputPortMeta> outputPorts;
if (logicalOperator == null) {
- throw new NotFoundException();
+ ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
+ if (logicalModule == null) {
+ throw new NotFoundException();
+ }
+ inputPorts = logicalModule.getInputStreams().keySet();
+ outputPorts = logicalModule.getOutputStreams().keySet();
+ } else {
+ inputPorts = logicalOperator.getInputStreams().keySet();
+ outputPorts = logicalOperator.getOutputStreams().keySet();
}
+
+ JSONObject result = getPortsObjects(inputPorts, outputPorts);
+ return result;
+ }
+
+ private JSONObject getPortsObjects(Collection<LogicalPlan.InputPortMeta> inputs, Collection<LogicalPlan.OutputPortMeta> outputs)
+ {
JSONObject result = new JSONObject();
JSONArray ports = new JSONArray();
try {
- for (LogicalPlan.InputPortMeta inputPort : logicalOperator.getInputStreams().keySet()) {
+ for (LogicalPlan.InputPortMeta inputPort : inputs) {
JSONObject port = new JSONObject();
port.put("name", inputPort.getPortName());
port.put("type", "input");
ports.put(port);
}
- for (LogicalPlan.OutputPortMeta outputPort : logicalOperator.getOutputStreams().keySet()) {
+ for (LogicalPlan.OutputPortMeta outputPort : outputs) {
JSONObject port = new JSONObject();
port.put("name", outputPort.getPortName());
port.put("type", "output");
ports.put(port);
}
result.put("ports", ports);
- }
- catch (JSONException ex) {
+ } catch (JSONException ex) {
throw new RuntimeException(ex);
}
return result;
}
+ private JSONObject getPortObject(Collection<LogicalPlan.InputPortMeta> inputs, Collection<LogicalPlan.OutputPortMeta> outputs,
+ String portName) throws JSONException
+ {
+ for (LogicalPlan.InputPortMeta inputPort : inputs) {
+ if (inputPort.getPortName().equals(portName)) {
+ JSONObject port = new JSONObject();
+ port.put("name", inputPort.getPortName());
+ port.put("type", "input");
+ return port;
+ }
+ }
+ for (LogicalPlan.OutputPortMeta outputPort : outputs) {
+ if (outputPort.getPortName().equals(portName)) {
+ JSONObject port = new JSONObject();
+ port.put("name", outputPort.getPortName());
+ port.put("type", "output");
+ return port;
+ }
+ }
+ return null;
+ }
+
+
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/ports/{portName}")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getPort(@PathParam("operatorName") String operatorName, @PathParam("portName") String portName)
{
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
+ Set<LogicalPlan.InputPortMeta> inputPorts;
+ Set<LogicalPlan.OutputPortMeta> outputPorts;
if (logicalOperator == null) {
- throw new NotFoundException();
+ ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
+ if (logicalModule == null) {
+ throw new NotFoundException();
+ }
+ inputPorts = logicalModule.getInputStreams().keySet();
+ outputPorts = logicalModule.getOutputStreams().keySet();
+ } else {
+ inputPorts = logicalOperator.getInputStreams().keySet();
+ outputPorts = logicalOperator.getOutputStreams().keySet();
}
+
try {
- for (LogicalPlan.InputPortMeta inputPort : logicalOperator.getInputStreams().keySet()) {
- if (portName.equals(portName)) {
- JSONObject port = new JSONObject();
- port.put("name", inputPort.getPortName());
- port.put("type", "input");
- return port;
- }
- }
- for (LogicalPlan.OutputPortMeta outputPort : logicalOperator.getOutputStreams().keySet()) {
- if (portName.equals(portName)) {
- JSONObject port = new JSONObject();
- port.put("name", outputPort.getPortName());
- port.put("type", "output");
- return port;
- }
+ JSONObject resp = getPortObject(inputPorts, outputPorts, portName);
+ if (resp != null) {
+ return resp;
}
- }
- catch (JSONException ex) {
+ } catch (JSONException ex) {
throw new RuntimeException(ex);
}
throw new NotFoundException();
@@ -711,31 +746,41 @@ public class StramWebServices
{
init();
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
+ BeanMap operatorProperties = null;
if (logicalOperator == null) {
- throw new NotFoundException();
+ ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
+ if (logicalModule == null) {
+ throw new NotFoundException();
+ }
+ operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getModule());
+ } else {
+ operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
}
- BeanMap operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
+ Map<String, Object> m = getPropertiesAsMap(propertyName, operatorProperties);
+ return new JSONObject(objectMapper.writeValueAsString(m));
+ }
+
+ private Map<String, Object> getPropertiesAsMap(@QueryParam("propertyName") String propertyName, BeanMap operatorProperties)
+ {
Map<String, Object> m = new HashMap<String, Object>();
@SuppressWarnings("rawtypes")
Iterator entryIterator = operatorProperties.entryIterator();
while (entryIterator.hasNext()) {
try {
@SuppressWarnings("unchecked")
- Map.Entry<String, Object> entry = (Map.Entry<String, Object>)entryIterator.next();
+ Entry<String, Object> entry = (Entry<String, Object>)entryIterator.next();
if (propertyName == null) {
m.put(entry.getKey(), entry.getValue());
- }
- else if (propertyName.equals(entry.getKey())) {
+ } else if (propertyName.equals(entry.getKey())) {
m.put(entry.getKey(), entry.getValue());
break;
}
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
LOG.warn("Caught exception", ex);
}
}
- return new JSONObject(objectMapper.writeValueAsString(m));
+ return m;
}
@GET
@@ -765,10 +810,10 @@ public class StramWebServices
@GET
@Path(PATH_LOGICAL_PLAN)
@Produces(MediaType.APPLICATION_JSON)
- public JSONObject getLogicalPlan() throws JSONException, IOException
+ public JSONObject getLogicalPlan(@QueryParam("includeModules") String includeModules) throws JSONException, IOException
{
- LogicalPlan lp = dagManager.getLogicalPlan();
- return new JSONObject(objectMapper.writeValueAsString(LogicalPlanSerializer.convertToMap(lp)));
+ return new JSONObject(objectMapper.writeValueAsString(LogicalPlanSerializer.convertToMap(
+ dagManager.getLogicalPlan(), includeModules != null)));
}
@POST // not supported by WebAppProxyServlet, can only be called directly