You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/05/04 00:16:57 UTC
[1/2] incubator-apex-core git commit: APEXCORE-330 : Getting stack
trace from the containers, exposed through REST api return value will in JSON.
Repository: incubator-apex-core
Updated Branches:
refs/heads/master 752f4397d -> c2903da2f
APEXCORE-330 : Getting stack trace from the containers, exposed through REST api
return value will in JSON.
StackTrace can be accessed through following call.
"/ws/v2/stram/physicalPlan/containers/<containersID>/stackTrace"
Addressed review comments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/ac7d673a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/ac7d673a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/ac7d673a
Branch: refs/heads/master
Commit: ac7d673ad89e1fda27fcf418022dbd74430326c2
Parents: cae4dc5
Author: sandeshh <sa...@gmail.com>
Authored: Sun Apr 24 07:45:12 2016 -0700
Committer: sandeshh <sa...@gmail.com>
Committed: Tue May 3 15:01:04 2016 -0700
----------------------------------------------------------------------
.../java/com/datatorrent/stram/StramUtils.java | 52 ++++++++++++++++++++
.../stram/StreamingContainerAgent.java | 9 ++++
.../stram/StreamingContainerManager.java | 5 ++
.../StreamingContainerUmbilicalProtocol.java | 3 ++
.../java/com/datatorrent/stram/cli/ApexCli.java | 26 ++++++++++
.../stram/engine/StreamingContainer.java | 14 ++++++
.../stram/webapp/StramWebServices.java | 40 +++++++++++++++
7 files changed, 149 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/StramUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramUtils.java b/engine/src/main/java/com/datatorrent/stram/StramUtils.java
index 8b413bc..a931253 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramUtils.java
@@ -21,6 +21,12 @@ package com.datatorrent.stram;
import java.util.Map;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.log4j.DTLoggerFactory;
@@ -35,6 +41,8 @@ import com.datatorrent.api.StreamingApplication;
*/
public abstract class StramUtils
{
+ private static final Logger LOG = LoggerFactory.getLogger(StramUtils.class);
+
public static <T> Class<? extends T> classForName(String className, Class<T> superClass)
{
try {
@@ -81,4 +89,48 @@ public abstract class StramUtils
}
}
+ public static JSONObject getStackTrace()
+ {
+ Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces();
+
+ JSONObject jsonObject = new JSONObject();
+ JSONArray jsonArray = new JSONArray();
+
+ for (Map.Entry<Thread, StackTraceElement[]> elements : stackTraces.entrySet()) {
+
+ JSONObject jsonThread = new JSONObject();
+
+ Thread thread = elements.getKey();
+
+ try {
+
+ jsonThread.put("name", thread.getName());
+ jsonThread.put("state", thread.getState());
+ jsonThread.put("id", thread.getId());
+
+ JSONArray stackTraceElements = new JSONArray();
+
+ for (StackTraceElement stackTraceElement : elements.getValue()) {
+
+ stackTraceElements.put(stackTraceElement.toString());
+ }
+
+ jsonThread.put("stackTraceElements", stackTraceElements);
+
+ jsonArray.put(jsonThread);
+ } catch (Exception ex) {
+ LOG.warn("Getting stack trace for the thread " + thread.getName() + " failed.");
+ continue;
+ }
+ }
+
+ try {
+ jsonObject.put("threads", jsonArray);
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+
+ return jsonObject;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index fc2fb17..598fea5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -96,6 +96,7 @@ public class StreamingContainerAgent
}
boolean shutdownRequested = false;
+ boolean stackTraceRequested = false;
Set<PTOperator> deployOpers = Sets.newHashSet();
Set<Integer> undeployOpers = Sets.newHashSet();
@@ -468,4 +469,12 @@ public class StreamingContainerAgent
return ci;
}
+ public String getStackTrace()
+ {
+
+ stackTraceRequested = true;
+ return containerStackTrace;
+ }
+
+ public volatile String containerStackTrace = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 37f63b2..b12709e 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1489,6 +1489,8 @@ public class StreamingContainerManager implements PlanContext
});
}
+ sca.containerStackTrace = heartbeat.stackTrace;
+
if (heartbeat.restartRequested) {
LOG.error("Container {} restart request", sca.container.getExternalId());
containerStopRequests.put(sca.container.getExternalId(), sca.container.getExternalId());
@@ -1820,6 +1822,9 @@ public class StreamingContainerManager implements PlanContext
}
rsp.nodeRequests = requests;
rsp.committedWindowId = committedWindowId;
+ rsp.stackTraceRequired = sca.stackTraceRequested;
+ sca.stackTraceRequested = false;
+
return rsp;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
index d01d2b6..150e3b3 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
@@ -253,6 +253,7 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
return stats.id;
}
+ public String stackTrace;
}
/**
@@ -380,6 +381,8 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
* Set when dag purges a particular windowId as it's processed by all the operators.
*/
public long committedWindowId = -1;
+
+ public boolean stackTraceRequired = false;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 87e4bdc..67406a3 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -770,6 +770,10 @@ public class ApexCli
null,
new Arg[]{new Arg("operator-id"), new Arg("start-time")},
"Get tuple recording info"));
+ connectedCommands.put("get-container-stacktrace", new CommandSpec(new GetContainerStackTrace(),
+ null,
+ new Arg[]{new Arg("container-id")},
+ "Get the stack trace for the container"));
//
// Logical plan change command specification starts here
@@ -3366,6 +3370,28 @@ public class ApexCli
}
+ private class GetContainerStackTrace implements Command
+ {
+ @Override
+ public void execute(String[] args, ConsoleReader reader) throws Exception
+ {
+ String containerLongId = getContainerLongId(args[1]);
+ if (containerLongId == null) {
+ throw new CliException("Container " + args[1] + " not found");
+ }
+
+ JSONObject response;
+ try {
+ response = getResource(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS + "/" + args[1] + "/" + StramWebServices.PATH_STACKTRACE, currentApp);
+ } catch (Exception ex) {
+ throw new CliException("Webservice call to AppMaster failed.", ex);
+ }
+
+ printJson(response);
+ }
+
+ }
+
private class GetAppPackageInfoCommand implements Command
{
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 2436776..1953d7a 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -79,6 +79,7 @@ import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.ComponentContextPair;
import com.datatorrent.stram.RecoverableRpcProxy;
+import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.StramUtils.YarnContainerMain;
import com.datatorrent.stram.StringCodecs;
import com.datatorrent.stram.api.Checkpoint;
@@ -609,6 +610,7 @@ public class StreamingContainer extends YarnContainerMain
long tokenLifeTime = (long)(containerContext.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * containerContext.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME));
long expiryTime = System.currentTimeMillis();
final Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+ String stackTrace = null;
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
@@ -649,6 +651,7 @@ public class StreamingContainer extends YarnContainerMain
ContainerHeartbeatResponse rsp;
do {
+
ContainerStats stats = new ContainerStats(containerId);
// gather heartbeat info for all operators
for (Map.Entry<Integer, Node<?>> e : nodes.entrySet()) {
@@ -690,8 +693,19 @@ public class StreamingContainer extends YarnContainerMain
// heartbeat call and follow-up processing
//logger.debug("Sending heartbeat for {} operators.", msg.getContainerStats().size());
msg.sentTms = System.currentTimeMillis();
+
+ msg.stackTrace = stackTrace;
+
rsp = umbilical.processHeartbeat(msg);
+
+ if (rsp.stackTraceRequired) {
+ stackTrace = StramUtils.getStackTrace().toString();
+ } else {
+ stackTrace = null;
+ }
+
processHeartbeatResponse(rsp);
+
if (rsp.hasPendingRequests) {
logger.info("Waiting for pending request.");
synchronized (this.heartbeatTrigger) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
index 1047f12..52be922 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
@@ -80,6 +81,7 @@ import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StringCodec;
import com.datatorrent.stram.StramAppContext;
+import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.StringCodecs;
@@ -122,7 +124,11 @@ public class StramWebServices
public static final String PATH_OPERATOR_CLASSES = "operatorClasses";
public static final String PATH_ALERTS = "alerts";
public static final String PATH_LOGGERS = "loggers";
+ public static final String PATH_STACKTRACE = "stackTrace";
public static final long WAIT_TIME = 5000;
+ public static final long STACK_TRACE_WAIT_TIME = 1000;
+ public static final long STACK_TRACE_ATTEMPTS = 10;
+
//public static final String PATH_ACTION_OPERATOR_CLASSES = "actionOperatorClasses";
private StramAppContext appCtx;
@@ -492,6 +498,40 @@ public class StramWebServices
return new JSONObject(objectMapper.writeValueAsString(ci));
}
+ @GET
+ @Path(PATH_PHYSICAL_PLAN_CONTAINERS + "/{containerId}/" + PATH_STACKTRACE)
+ @Produces(MediaType.APPLICATION_JSON)
+ public JSONObject getContainerStackTrace(@PathParam("containerId") String containerId) throws Exception
+ {
+ init();
+
+ if (containerId.equals(System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString()))) {
+ return StramUtils.getStackTrace();
+ }
+
+ StreamingContainerAgent sca = dagManager.getContainerAgent(containerId);
+
+ if (sca == null) {
+ throw new NotFoundException("Container not found.");
+ }
+
+ if (!sca.getContainerInfo().state.equals("ACTIVE")) {
+ throw new NotFoundException("Container is not active.");
+ }
+
+ for (int i = 0; i < STACK_TRACE_ATTEMPTS; ++i) {
+ String result = sca.getStackTrace();
+
+ if (result != null) {
+ return new JSONObject(result);
+ }
+
+ Thread.sleep(STACK_TRACE_WAIT_TIME);
+ }
+
+ throw new TimeoutException("Not able to get the stack trace");
+ }
+
@POST // not supported by WebAppProxyServlet, can only be called directly
@Path(PATH_PHYSICAL_PLAN_CONTAINERS + "/{containerId}/kill")
@Produces(MediaType.APPLICATION_JSON)
[2/2] incubator-apex-core git commit: Merge branch 'APEXCORE-330' of
github.com:sandeshh/incubator-apex-core
Posted by da...@apache.org.
Merge branch 'APEXCORE-330' of github.com:sandeshh/incubator-apex-core
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/c2903da2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/c2903da2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/c2903da2
Branch: refs/heads/master
Commit: c2903da2f19fab19728e8f6683fdf0424b4cabf8
Parents: 752f439 ac7d673
Author: David Yan <da...@datatorrent.com>
Authored: Tue May 3 15:06:07 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue May 3 15:06:07 2016 -0700
----------------------------------------------------------------------
.../java/com/datatorrent/stram/StramUtils.java | 52 ++++++++++++++++++++
.../stram/StreamingContainerAgent.java | 9 ++++
.../stram/StreamingContainerManager.java | 5 ++
.../StreamingContainerUmbilicalProtocol.java | 3 ++
.../java/com/datatorrent/stram/cli/ApexCli.java | 26 ++++++++++
.../stram/engine/StreamingContainer.java | 14 ++++++
.../stram/webapp/StramWebServices.java | 40 +++++++++++++++
7 files changed, 149 insertions(+)
----------------------------------------------------------------------