You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/05/04 00:16:57 UTC

[1/2] incubator-apex-core git commit: APEXCORE-330 : Getting stack trace from the containers, exposed through REST api return value will in JSON.

Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 752f4397d -> c2903da2f


APEXCORE-330 : Getting stack trace from the containers, exposed through REST api
return value will in JSON.

StackTrace can be accessed through following call.
  "/ws/v2/stram/physicalPlan/containers/<containersID>/stackTrace"

Addressed review comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/ac7d673a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/ac7d673a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/ac7d673a

Branch: refs/heads/master
Commit: ac7d673ad89e1fda27fcf418022dbd74430326c2
Parents: cae4dc5
Author: sandeshh <sa...@gmail.com>
Authored: Sun Apr 24 07:45:12 2016 -0700
Committer: sandeshh <sa...@gmail.com>
Committed: Tue May 3 15:01:04 2016 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/stram/StramUtils.java  | 52 ++++++++++++++++++++
 .../stram/StreamingContainerAgent.java          |  9 ++++
 .../stram/StreamingContainerManager.java        |  5 ++
 .../StreamingContainerUmbilicalProtocol.java    |  3 ++
 .../java/com/datatorrent/stram/cli/ApexCli.java | 26 ++++++++++
 .../stram/engine/StreamingContainer.java        | 14 ++++++
 .../stram/webapp/StramWebServices.java          | 40 +++++++++++++++
 7 files changed, 149 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/StramUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramUtils.java b/engine/src/main/java/com/datatorrent/stram/StramUtils.java
index 8b413bc..a931253 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramUtils.java
@@ -21,6 +21,12 @@ package com.datatorrent.stram;
 
 import java.util.Map;
 
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.log4j.DTLoggerFactory;
 
@@ -35,6 +41,8 @@ import com.datatorrent.api.StreamingApplication;
  */
 public abstract class StramUtils
 {
+  private static final Logger LOG = LoggerFactory.getLogger(StramUtils.class);
+
   public static <T> Class<? extends T> classForName(String className, Class<T> superClass)
   {
     try {
@@ -81,4 +89,48 @@ public abstract class StramUtils
     }
   }
 
+  public static JSONObject getStackTrace()
+  {
+    Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces();
+
+    JSONObject jsonObject = new JSONObject();
+    JSONArray jsonArray = new JSONArray();
+
+    for (Map.Entry<Thread, StackTraceElement[]> elements : stackTraces.entrySet()) {
+
+      JSONObject jsonThread = new JSONObject();
+
+      Thread thread = elements.getKey();
+
+      try {
+
+        jsonThread.put("name", thread.getName());
+        jsonThread.put("state", thread.getState());
+        jsonThread.put("id", thread.getId());
+
+        JSONArray stackTraceElements = new JSONArray();
+
+        for (StackTraceElement stackTraceElement : elements.getValue()) {
+
+          stackTraceElements.put(stackTraceElement.toString());
+        }
+
+        jsonThread.put("stackTraceElements", stackTraceElements);
+
+        jsonArray.put(jsonThread);
+      } catch (Exception ex) {
+        LOG.warn("Getting stack trace for the thread " + thread.getName() + " failed.");
+        continue;
+      }
+    }
+
+    try {
+      jsonObject.put("threads", jsonArray);
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+
+    return jsonObject;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index fc2fb17..598fea5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -96,6 +96,7 @@ public class StreamingContainerAgent
   }
 
   boolean shutdownRequested = false;
+  boolean stackTraceRequested = false;
 
   Set<PTOperator> deployOpers = Sets.newHashSet();
   Set<Integer> undeployOpers = Sets.newHashSet();
@@ -468,4 +469,12 @@ public class StreamingContainerAgent
     return ci;
   }
 
+  public String getStackTrace()
+  {
+
+    stackTraceRequested = true;
+    return containerStackTrace;
+  }
+
+  public volatile String containerStackTrace = null;
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 37f63b2..b12709e 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1489,6 +1489,8 @@ public class StreamingContainerManager implements PlanContext
       });
     }
 
+    sca.containerStackTrace = heartbeat.stackTrace;
+
     if (heartbeat.restartRequested) {
       LOG.error("Container {} restart request", sca.container.getExternalId());
       containerStopRequests.put(sca.container.getExternalId(), sca.container.getExternalId());
@@ -1820,6 +1822,9 @@ public class StreamingContainerManager implements PlanContext
     }
     rsp.nodeRequests = requests;
     rsp.committedWindowId = committedWindowId;
+    rsp.stackTraceRequired = sca.stackTraceRequested;
+    sca.stackTraceRequested = false;
+
     return rsp;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
index d01d2b6..150e3b3 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
@@ -253,6 +253,7 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
       return stats.id;
     }
 
+    public String stackTrace;
   }
 
   /**
@@ -380,6 +381,8 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
      * Set when dag purges a particular windowId as it's processed by all the operators.
      */
     public long committedWindowId = -1;
+
+    public boolean stackTraceRequired = false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 87e4bdc..67406a3 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -770,6 +770,10 @@ public class ApexCli
         null,
         new Arg[]{new Arg("operator-id"), new Arg("start-time")},
         "Get tuple recording info"));
+    connectedCommands.put("get-container-stacktrace", new CommandSpec(new GetContainerStackTrace(),
+        null,
+        new Arg[]{new Arg("container-id")},
+        "Get the stack trace for the container"));
 
     //
     // Logical plan change command specification starts here
@@ -3366,6 +3370,28 @@ public class ApexCli
 
   }
 
+  private class GetContainerStackTrace implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      String containerLongId = getContainerLongId(args[1]);
+      if (containerLongId == null) {
+        throw new CliException("Container " + args[1] + " not found");
+      }
+
+      JSONObject response;
+      try {
+        response = getResource(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS + "/" + args[1] + "/" + StramWebServices.PATH_STACKTRACE, currentApp);
+      } catch (Exception ex) {
+        throw new CliException("Webservice call to AppMaster failed.", ex);
+      }
+
+      printJson(response);
+    }
+
+  }
+
   private class GetAppPackageInfoCommand implements Command
   {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 2436776..1953d7a 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -79,6 +79,7 @@ import com.datatorrent.netlet.DefaultEventLoop;
 import com.datatorrent.netlet.util.Slice;
 import com.datatorrent.stram.ComponentContextPair;
 import com.datatorrent.stram.RecoverableRpcProxy;
+import com.datatorrent.stram.StramUtils;
 import com.datatorrent.stram.StramUtils.YarnContainerMain;
 import com.datatorrent.stram.StringCodecs;
 import com.datatorrent.stram.api.Checkpoint;
@@ -609,6 +610,7 @@ public class StreamingContainer extends YarnContainerMain
     long tokenLifeTime = (long)(containerContext.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * containerContext.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME));
     long expiryTime = System.currentTimeMillis();
     final Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+    String stackTrace = null;
     Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
     while (iter.hasNext()) {
       Token<?> token = iter.next();
@@ -649,6 +651,7 @@ public class StreamingContainer extends YarnContainerMain
 
       ContainerHeartbeatResponse rsp;
       do {
+
         ContainerStats stats = new ContainerStats(containerId);
         // gather heartbeat info for all operators
         for (Map.Entry<Integer, Node<?>> e : nodes.entrySet()) {
@@ -690,8 +693,19 @@ public class StreamingContainer extends YarnContainerMain
         // heartbeat call and follow-up processing
         //logger.debug("Sending heartbeat for {} operators.", msg.getContainerStats().size());
         msg.sentTms = System.currentTimeMillis();
+
+        msg.stackTrace = stackTrace;
+
         rsp = umbilical.processHeartbeat(msg);
+
+        if (rsp.stackTraceRequired) {
+          stackTrace = StramUtils.getStackTrace().toString();
+        } else {
+          stackTrace = null;
+        }
+
         processHeartbeatResponse(rsp);
+
         if (rsp.hasPendingRequests) {
           logger.info("Waiting for pending request.");
           synchronized (this.heartbeatTrigger) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ac7d673a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
index 1047f12..52be922 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
@@ -80,6 +81,7 @@ import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StringCodec;
 import com.datatorrent.stram.StramAppContext;
+import com.datatorrent.stram.StramUtils;
 import com.datatorrent.stram.StreamingContainerAgent;
 import com.datatorrent.stram.StreamingContainerManager;
 import com.datatorrent.stram.StringCodecs;
@@ -122,7 +124,11 @@ public class StramWebServices
   public static final String PATH_OPERATOR_CLASSES = "operatorClasses";
   public static final String PATH_ALERTS = "alerts";
   public static final String PATH_LOGGERS = "loggers";
+  public static final String PATH_STACKTRACE = "stackTrace";
   public static final long WAIT_TIME = 5000;
+  public static final long STACK_TRACE_WAIT_TIME = 1000;
+  public static final long STACK_TRACE_ATTEMPTS = 10;
+
 
   //public static final String PATH_ACTION_OPERATOR_CLASSES = "actionOperatorClasses";
   private StramAppContext appCtx;
@@ -492,6 +498,40 @@ public class StramWebServices
     return new JSONObject(objectMapper.writeValueAsString(ci));
   }
 
+  @GET
+  @Path(PATH_PHYSICAL_PLAN_CONTAINERS + "/{containerId}/" + PATH_STACKTRACE)
+  @Produces(MediaType.APPLICATION_JSON)
+  public JSONObject getContainerStackTrace(@PathParam("containerId") String containerId) throws Exception
+  {
+    init();
+
+    if (containerId.equals(System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString()))) {
+      return StramUtils.getStackTrace();
+    }
+
+    StreamingContainerAgent sca = dagManager.getContainerAgent(containerId);
+
+    if (sca == null) {
+      throw new NotFoundException("Container not found.");
+    }
+
+    if (!sca.getContainerInfo().state.equals("ACTIVE")) {
+      throw new NotFoundException("Container is not active.");
+    }
+
+    for (int i = 0; i < STACK_TRACE_ATTEMPTS; ++i) {
+      String result = sca.getStackTrace();
+
+      if (result != null) {
+        return new JSONObject(result);
+      }
+
+      Thread.sleep(STACK_TRACE_WAIT_TIME);
+    }
+
+    throw new TimeoutException("Not able to get the stack trace");
+  }
+
   @POST // not supported by WebAppProxyServlet, can only be called directly
   @Path(PATH_PHYSICAL_PLAN_CONTAINERS + "/{containerId}/kill")
   @Produces(MediaType.APPLICATION_JSON)


[2/2] incubator-apex-core git commit: Merge branch 'APEXCORE-330' of github.com:sandeshh/incubator-apex-core

Posted by da...@apache.org.
Merge branch 'APEXCORE-330' of github.com:sandeshh/incubator-apex-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/c2903da2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/c2903da2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/c2903da2

Branch: refs/heads/master
Commit: c2903da2f19fab19728e8f6683fdf0424b4cabf8
Parents: 752f439 ac7d673
Author: David Yan <da...@datatorrent.com>
Authored: Tue May 3 15:06:07 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue May 3 15:06:07 2016 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/stram/StramUtils.java  | 52 ++++++++++++++++++++
 .../stram/StreamingContainerAgent.java          |  9 ++++
 .../stram/StreamingContainerManager.java        |  5 ++
 .../StreamingContainerUmbilicalProtocol.java    |  3 ++
 .../java/com/datatorrent/stram/cli/ApexCli.java | 26 ++++++++++
 .../stram/engine/StreamingContainer.java        | 14 ++++++
 .../stram/webapp/StramWebServices.java          | 40 +++++++++++++++
 7 files changed, 149 insertions(+)
----------------------------------------------------------------------