You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2014/12/08 20:21:41 UTC
[6/9] git commit: updated refs/heads/trunk to 8675c84
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/Server.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/Server.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/Server.java
new file mode 100644
index 0000000..85b72bd
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/Server.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.gui;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.giraph.debugger.mock.ComputationComputeTestGenerator;
+import org.apache.giraph.debugger.mock.MasterComputeTestGenerator;
+import org.apache.giraph.debugger.mock.TestGraphGenerator;
+import org.apache.giraph.debugger.utils.DebuggerUtils;
+import org.apache.giraph.debugger.utils.DebuggerUtils.DebugTrace;
+import org.apache.giraph.debugger.utils.GiraphMasterScenarioWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper;
+import org.apache.giraph.debugger.utils.MsgIntegrityViolationWrapper;
+import org.apache.log4j.Logger;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+
+/**
+ * Entry point to the HTTP Debugger Server.
+ */
+public class Server {
+
+ /**
+ * Logger for the class.
+ */
+ private static final Logger LOG = Logger.getLogger(Server.class);
+ /**
+ * Default port number for the server.
+ */
+ private static final int SERVER_PORT = Integer.parseInt(System.getProperty(
+ "giraph.debugger.guiPort", "8000"));
+
+ /**
+ * Private constructor to disallow construction outside of the class.
+ */
+ private Server() { }
+
+ /**
+ * @param args command line arguments for the server
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ HttpServer server = HttpServer
+ .create(new InetSocketAddress(SERVER_PORT), 0);
+ // Attach JobHandler instance to handle /job GET call.
+ server.createContext("/vertices", new GetVertices());
+ server.createContext("/supersteps", new GetSupersteps());
+ server.createContext("/scenario", new GetScenario());
+ server.createContext("/integrity", new GetIntegrity());
+ server.createContext("/test/vertex", new GetVertexTest());
+ server.createContext("/test/master", new GetMasterTest());
+ server.createContext("/test/graph", new GetTestGraph());
+ server.createContext("/", new GetEditor());
+ // Creates a default executor.
+ server.setExecutor(null);
+ server.start();
+ }
+
+ /**
+ * Handler when accessing the landing page for the server.
+ */
+ static class GetEditor implements HttpHandler {
+
+ @Override
+ public void handle(HttpExchange t) {
+ URI uri = t.getRequestURI();
+ try {
+ try {
+ String path = uri.getPath();
+ LOG.debug(path);
+ if (path.endsWith("/")) {
+ path += "index.html";
+ }
+ path = path.replaceFirst("^/", "");
+ LOG.debug("resource path to look for = " + path);
+ LOG.debug("resource URL = " + getClass().getResource(path));
+ InputStream fs = getClass().getResourceAsStream(path);
+ if (fs == null) {
+ // Object does not exist or is not a file: reject
+ // with 404 error.
+ String response = "404 (Not Found)\n";
+ t.sendResponseHeaders(404, response.length());
+ OutputStream os = t.getResponseBody();
+ os.write(response.getBytes());
+ os.close();
+ } else {
+ // Object exists and is a file: accept with response
+ // code 200.
+ t.sendResponseHeaders(200, 0);
+ OutputStream os = t.getResponseBody();
+ final byte[] buffer = new byte[0x10000];
+ int count = 0;
+ while ((count = fs.read(buffer)) >= 0) {
+ os.write(buffer, 0, count);
+ }
+ fs.close();
+ os.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ t.sendResponseHeaders(404, 0);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Returns the list of vertices debugged in a given Superstep for a given job.
+ *
+ * URL parameters: {jobId, superstepId}
+ */
+ static class GetVertices extends ServerHttpHandler {
+ @Override
+ public void processRequest(HttpExchange httpExchange,
+ Map<String, String> paramMap) {
+ String jobId = paramMap.get(ServerUtils.JOB_ID_KEY);
+ String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY);
+ // CHECKSTYLE: stop IllegalCatch
+ try {
+ // jobId and superstepId are mandatory. Validate.
+ if (jobId == null || superstepId == null) {
+ throw new IllegalArgumentException("Missing mandatory params.");
+ }
+ List<String> vertexIds = null;
+ // May throw NumberFormatException. Handled below.
+ long superstepNo = Long.parseLong(superstepId);
+ if (superstepNo < -1) {
+ throw new NumberFormatException("Superstep must be integer >= -1.");
+ }
+ // May throw IOException. Handled below.
+ vertexIds = ServerUtils.getVerticesDebugged(jobId, superstepNo,
+ DebugTrace.VERTEX_ALL);
+ this.statusCode = HttpURLConnection.HTTP_OK;
+ // Returns output as an array ["id1", "id2", "id3" .... ]
+ this.response = new JSONArray(vertexIds).toString();
+ } catch (Exception e) {
+ this.handleException(e, String.format(
+ "Invalid parameters. %s is a mandatory parameter.",
+ ServerUtils.JOB_ID_KEY));
+ }
+ // CHECKSTYLE: resume IllegalCatch
+ }
+ }
+
+ /**
+ * Returns the number of supersteps traced for the given job.
+ */
+ static class GetSupersteps extends ServerHttpHandler {
+ @Override
+ public void processRequest(HttpExchange httpExchange,
+ Map<String, String> paramMap) {
+ String jobId = paramMap.get(ServerUtils.JOB_ID_KEY);
+ // CHECKSTYLE: stop IllegalCatch
+ try {
+ // jobId and superstepId are mandatory. Validate.
+ if (jobId == null) {
+ throw new IllegalArgumentException("Missing mandatory params.");
+ }
+ List<Long> superstepIds = null;
+ // May throw IOException. Handled below.
+ superstepIds = ServerUtils.getSuperstepsDebugged(jobId);
+ this.statusCode = HttpURLConnection.HTTP_OK;
+ // Returns output as an array ["id1", "id2", "id3" .... ]
+ this.response = new JSONArray(superstepIds).toString();
+ } catch (Exception e) {
+ this.handleException(e, String.format(
+ "Invalid parameters. %s and %s are mandatory parameter.",
+ ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY));
+ }
+ // CHECKSTYLE: resume IllegalCatch
+ }
+ }
+
+ /**
+ * Returns the scenario for a given superstep of a given job.
+ *
+ * URL Params: {jobId, superstepId, [vertexId], [raw]}
+ * vertexId: vertexId is optional. It can be a single value or a comma
+ * separated list. If it is not supplied, returns the scenario for all
+ * vertices. If 'raw' parameter is specified, returns the raw protocol
+ * buffer.
+ */
+ static class GetScenario extends ServerHttpHandler {
+ @Override
+ @SuppressWarnings("rawtypes")
+ public void processRequest(HttpExchange httpExchange,
+ Map<String, String> paramMap) {
+ String jobId = paramMap.get(ServerUtils.JOB_ID_KEY);
+ String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY);
+ // Check both jobId and superstepId are present
+ // CHECKSTYLE: stop IllegalCatch
+ try {
+ if (jobId == null || superstepId == null) {
+ throw new IllegalArgumentException("Missing mandatory parameters");
+ }
+ Long superstepNo = Long.parseLong(paramMap
+ .get(ServerUtils.SUPERSTEP_ID_KEY));
+ if (superstepNo < -1) {
+ this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST;
+ this.response = String.format("%s must be an integer >= -1.",
+ ServerUtils.SUPERSTEP_ID_KEY);
+ return;
+ }
+ List<String> vertexIds = null;
+ // Get the single vertexId or the list of vertexIds (comma-separated).
+ String rawVertexIds = paramMap.get(ServerUtils.VERTEX_ID_KEY);
+ // No vertex Id supplied. Return scenario for all vertices.
+ if (rawVertexIds == null) {
+ // Read scenario for all vertices.
+ // May throw IOException. Handled below.
+ vertexIds = ServerUtils.getVerticesDebugged(jobId, superstepNo,
+ DebugTrace.VERTEX_ALL);
+ } else {
+ // Split the vertices by comma.
+ vertexIds = Lists.newArrayList(rawVertexIds.split(","));
+ }
+ // Send JSON by default.
+ JSONObject scenarioObj = new JSONObject();
+ for (String vertexId : vertexIds) {
+ GiraphVertexScenarioWrapper giraphScenarioWrapper;
+ giraphScenarioWrapper = ServerUtils.readScenarioFromTrace(jobId,
+ superstepNo, vertexId.trim(), DebugTrace.VERTEX_REGULAR);
+ scenarioObj.put(vertexId,
+ ServerUtils.scenarioToJSON(giraphScenarioWrapper));
+ }
+ // Set status as OK and convert JSONObject to string.
+ this.statusCode = HttpURLConnection.HTTP_OK;
+ this.response = scenarioObj.toString();
+ } catch (Exception e) {
+ this.handleException(e, String.format(
+ "Invalid parameters. %s and %s are mandatory parameter.",
+ ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY));
+ }
+ // CHECKSTYLE: stop IllegalCatch
+ }
+ }
+
+ /**
+ * Returns the JAVA code for vertex scenario.
+ *
+ * URL Params: {jobId, superstepId, vertexId, traceType}
+ * traceType: Can be one of reg, err, msg or vv
+ */
+ static class GetVertexTest extends ServerHttpHandler {
+ @Override
+ @SuppressWarnings("rawtypes")
+ public void processRequest(HttpExchange httpExchange,
+ Map<String, String> paramMap) {
+ String jobId = paramMap.get(ServerUtils.JOB_ID_KEY);
+ String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY);
+ String vertexId = paramMap.get(ServerUtils.VERTEX_ID_KEY);
+ String traceType = paramMap.get(ServerUtils.VERTEX_TEST_TRACE_TYPE_KEY);
+ // Check both jobId, superstepId and vertexId are present
+ try {
+ if (jobId == null || superstepId == null || vertexId == null ||
+ traceType == null) {
+ throw new IllegalArgumentException("Missing mandatory parameters");
+ }
+ Long superstepNo = Long.parseLong(paramMap
+ .get(ServerUtils.SUPERSTEP_ID_KEY));
+ if (superstepNo < -1) {
+ throw new NumberFormatException();
+ }
+ DebugTrace debugTrace = DebuggerUtils
+ .getVertexDebugTraceForPrefix(traceType);
+ // Send JSON by default.
+ GiraphVertexScenarioWrapper giraphScenarioWrapper = ServerUtils
+ .readScenarioFromTrace(jobId, superstepNo, vertexId.trim(),
+ debugTrace);
+ ComputationComputeTestGenerator testGenerator =
+ new ComputationComputeTestGenerator();
+ String testClassName = String.format("%sTest_%s_S%s_V%s",
+ giraphScenarioWrapper.getVertexScenarioClassesWrapper()
+ .getClassUnderTest().getSimpleName(), jobId, superstepId, vertexId);
+ // Set the content-disposition header to force a download with the
+ // given filename.
+ String filename = String.format("%s.java", testClassName);
+ this.setResponseHeader("Content-Disposition",
+ String.format("attachment; filename=\"%s\"", filename));
+ this.statusCode = HttpURLConnection.HTTP_OK;
+ this.responseContentType = MediaType.TEXT_PLAIN;
+ this.response = testGenerator
+ .generateTest(giraphScenarioWrapper,
+ null /* testPackage is optional */, testClassName);
+ } catch (Exception e) {
+ this.handleException(e, String.format(
+ "Invalid parameters. %s, %s and %s are mandatory parameter.",
+ ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY,
+ ServerUtils.VERTEX_ID_KEY));
+ }
+ }
+ }
+
+ /**
+ * Returns the JAVA code for master scenario.
+ *
+ * @URLParams : {jobId, superstepId}
+ */
+ static class GetMasterTest extends ServerHttpHandler {
+ @Override
+ public void processRequest(HttpExchange httpExchange,
+ Map<String, String> paramMap) {
+ String jobId = paramMap.get(ServerUtils.JOB_ID_KEY);
+ String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY);
+ // Check both jobId, superstepId and vertexId are present
+ try {
+ if (jobId == null || superstepId == null) {
+ throw new IllegalArgumentException("Missing mandatory parameters");
+ }
+ Long superstepNo = Long.parseLong(paramMap
+ .get(ServerUtils.SUPERSTEP_ID_KEY));
+ if (superstepNo < -1) {
+ this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST;
+ this.response = String.format("%s must be an integer >= -1.",
+ ServerUtils.SUPERSTEP_ID_KEY);
+ return;
+ }
+ // Send JSON by default.
+ GiraphMasterScenarioWrapper giraphScenarioWrapper = ServerUtils
+ .readMasterScenarioFromTrace(jobId, superstepNo,
+ DebugTrace.MASTER_ALL);
+ MasterComputeTestGenerator masterTestGenerator =
+ new MasterComputeTestGenerator();
+ // Set the content-disposition header to force a download with the
+ // given filename.
+ String testClassName = String.format("%sTest_%s_S%s",
+ giraphScenarioWrapper.getMasterClassUnderTest()
+ .replaceFirst(".*\\.", ""), jobId, superstepId);
+ String filename = String.format("%s.java", testClassName);
+ this.setResponseHeader("Content-Disposition",
+ String.format("attachment; filename=\"%s\"", filename));
+ this.statusCode = HttpURLConnection.HTTP_OK;
+ this.responseContentType = MediaType.TEXT_PLAIN;
+ this.response = masterTestGenerator.generateTest(giraphScenarioWrapper,
+ null /* testPackage is optional */, testClassName);
+ } catch (Exception e) {
+ this.handleException(e, String.format(
+ "Invalid parameters. %s and %s are mandatory parameter.",
+ ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY));
+ }
+ }
+ }
+
+ /**
+ * Returns the integrity violations based on the requested parameter. The
+ * requested parameter (type) may be one of M, E or V.
+ *
+ * URL Params: jobId, superstepId, violiationType It is an optional parameter
+ * and is only used when violationType = V
+ */
+ static class GetIntegrity extends ServerHttpHandler {
+ /**
+ * The server returns only a limited number of msg or vertex value
+ * violations. For message violations, it may not put the limit at exactly
+ * this number because it reads each violation trace which may include
+ * multiple message violations and adds all the violations in the trace to
+ * the response. Once the total message violations is over this number it
+ * stops reading traces.
+ */
+ private static final int NUM_VIOLATIONS_THRESHOLD = 50;
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public void processRequest(HttpExchange httpExchange,
+ Map<String, String> paramMap) {
+ String jobId = paramMap.get(ServerUtils.JOB_ID_KEY);
+ String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY);
+ String violationType = paramMap
+ .get(ServerUtils.INTEGRITY_VIOLATION_TYPE_KEY);
+ try {
+ if (jobId == null || superstepId == null || violationType == null) {
+ throw new IllegalArgumentException("Missing mandatory parameters");
+ }
+ Long superstepNo = Long.parseLong(paramMap
+ .get(ServerUtils.SUPERSTEP_ID_KEY));
+ if (superstepNo < -1) {
+ throw new NumberFormatException();
+ }
+ // JSON object that will be finally returned.
+ JSONObject integrityObj = new JSONObject();
+ // Message violation
+ if (violationType.equals("M")) {
+ List<String> taskIds = ServerUtils.getTasksWithIntegrityViolations(
+ jobId, superstepNo, DebugTrace.INTEGRITY_MESSAGE_ALL);
+
+ int numViolations = 0;
+ for (String taskId : taskIds) {
+ MsgIntegrityViolationWrapper msgIntegrityViolationWrapper =
+ ServerUtils.readMsgIntegrityViolationFromTrace(jobId, taskId,
+ superstepNo);
+ integrityObj.put(taskId,
+ ServerUtils.msgIntegrityToJson(msgIntegrityViolationWrapper));
+ numViolations += msgIntegrityViolationWrapper.numMsgWrappers();
+ if (numViolations >= NUM_VIOLATIONS_THRESHOLD) {
+ break;
+ }
+ }
+ this.response = integrityObj.toString();
+ this.statusCode = HttpURLConnection.HTTP_OK;
+ } else if (violationType.equals("V")) {
+ List<String> vertexIds = ServerUtils.getVerticesDebugged(jobId,
+ superstepNo, DebugTrace.INTEGRITY_VERTEX);
+ int numViolations = 0;
+ for (String vertexId : vertexIds) {
+ GiraphVertexScenarioWrapper giraphVertexScenarioWrapper =
+ ServerUtils.readVertexIntegrityViolationFromTrace(jobId,
+ superstepNo, vertexId);
+ numViolations++;
+ integrityObj.put(vertexId,
+ ServerUtils.vertexIntegrityToJson(giraphVertexScenarioWrapper));
+ if (numViolations >= NUM_VIOLATIONS_THRESHOLD) {
+ break;
+ }
+ }
+ this.response = integrityObj.toString();
+ this.statusCode = HttpURLConnection.HTTP_OK;
+ } else if (violationType.equals("E")) {
+ List<String> vertexIds = null;
+ // Get the single vertexId or the list of vertexIds (comma-separated).
+ String rawVertexIds = paramMap.get(ServerUtils.VERTEX_ID_KEY);
+ // No vertex Id supplied. Return exceptions for all vertices.
+ if (rawVertexIds == null) {
+ // Read exceptions for all vertices.
+ vertexIds = ServerUtils.getVerticesDebugged(jobId, superstepNo,
+ DebugTrace.VERTEX_EXCEPTION);
+ } else {
+ // Split the vertices by comma.
+ vertexIds = Lists.newArrayList(rawVertexIds.split(","));
+ }
+ // Send JSON by default.
+ JSONObject scenarioObj = new JSONObject();
+ for (String vertexId : vertexIds) {
+ GiraphVertexScenarioWrapper giraphScenarioWrapper;
+ giraphScenarioWrapper = ServerUtils.readScenarioFromTrace(jobId,
+ superstepNo, vertexId.trim(), DebugTrace.VERTEX_EXCEPTION);
+ scenarioObj.put(vertexId,
+ ServerUtils.scenarioToJSON(giraphScenarioWrapper));
+ }
+ // Set status as OK and convert JSONObject to string.
+ this.statusCode = HttpURLConnection.HTTP_OK;
+ this.response = scenarioObj.toString();
+ }
+ } catch (Exception e) {
+ this.handleException(e, String.format(
+ "Invalid parameters. %s, %s and %s are mandatory parameter.",
+ ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY,
+ ServerUtils.INTEGRITY_VIOLATION_TYPE_KEY));
+ }
+ }
+ }
+
+ /**
+ * Returns the TestGraph JAVA code.
+ *
+ * URL Param: adjList - Adjacency list of the graph
+ */
+ static class GetTestGraph extends ServerHttpHandler {
+ @Override
+ public void processRequest(HttpExchange httpExchange,
+ Map<String, String> paramMap) {
+ String adjList = paramMap.get(ServerUtils.ADJLIST_KEY);
+ // Check both jobId and superstepId are present
+ try {
+ if (adjList == null) {
+ throw new IllegalArgumentException("Missing mandatory parameters");
+ }
+ TestGraphGenerator testGraphGenerator = new TestGraphGenerator();
+ String testGraph = testGraphGenerator.generate(adjList.split("\n"));
+ this.setResponseHeader("Content-Disposition",
+ "attachment; filename=graph.java");
+ this.statusCode = HttpURLConnection.HTTP_OK;
+ this.responseContentType = MediaType.TEXT_PLAIN;
+ this.response = testGraph;
+ } catch (Exception e) {
+ this.handleException(e, String.format(
+ "Invalid parameters. %s is mandatory parameter.",
+ ServerUtils.ADJLIST_KEY));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerHttpHandler.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerHttpHandler.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerHttpHandler.java
new file mode 100644
index 0000000..50ebb23
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerHttpHandler.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.gui;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.util.Map;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.log4j.Logger;
+
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+
+/**
+ * The Abstract class for HTTP handlers.
+ */
+public abstract class ServerHttpHandler implements HttpHandler {
+
+ /**
+ * Logger for this class.
+ */
+ private static final Logger LOG = Logger.getLogger(ServerHttpHandler.class);
+ /**
+ * Response body.
+ */
+ protected String response;
+ /**
+ * Response body as a byte array
+ */
+ protected byte[] responseBytes;
+ /**
+ * Response status code. Please use HttpUrlConnection final static members.
+ */
+ protected int statusCode;
+ /**
+ * MimeType of the response. Please use MediaType final static members.
+ */
+ protected String responseContentType;
+ /**
+ * HttpExchange object received in the handle call.
+ */
+ protected HttpExchange httpExchange;
+
+ /**
+ * Handles an HTTP call's lifecycle - read parameters, process and send
+ * response.
+ * @param httpExchange the http exchange object.
+ */
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ // Assign class members so that subsequent methods can use it.
+ this.httpExchange = httpExchange;
+ // Set application/json as the default content type.
+ this.responseContentType = MediaType.APPLICATION_JSON;
+ String rawUrl = httpExchange.getRequestURI().getQuery();
+ Map<String, String> paramMap;
+ try {
+ paramMap = ServerUtils.getUrlParams(rawUrl);
+ // Call the method implemented by inherited classes.
+ LOG.info(httpExchange.getRequestURI().getPath() + paramMap.toString());
+ processRequest(httpExchange, paramMap);
+ } catch (UnsupportedEncodingException ex) {
+ this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST;
+ this.response = "Malformed URL. Given encoding is not supported.";
+ }
+ // In case of an error statusCode, we just write the exception string.
+ // (Consider using JSON).
+ if (this.statusCode != HttpURLConnection.HTTP_OK) {
+ this.responseContentType = MediaType.TEXT_PLAIN;
+ }
+ // Set mandatory Response Headers.
+ this.setMandatoryResponseHeaders();
+ this.writeResponse();
+ }
+
+ /**
+ * Writes the text response.
+ */
+ private void writeResponse() throws IOException {
+ OutputStream os = this.httpExchange.getResponseBody();
+ if (this.responseContentType == MediaType.APPLICATION_JSON ||
+ this.responseContentType == MediaType.TEXT_PLAIN) {
+ this.httpExchange.sendResponseHeaders(this.statusCode,
+ this.response.length());
+ os.write(this.response.getBytes());
+ } else if (this.responseContentType == MediaType.APPLICATION_OCTET_STREAM) {
+ this.httpExchange.sendResponseHeaders(this.statusCode,
+ this.responseBytes.length);
+ os.write(this.responseBytes);
+ }
+ os.close();
+ }
+
+ /**
+ * Add mandatory headers to the HTTP response by the debugger server. MUST be
+ * called before sendResponseHeaders.
+ */
+ private void setMandatoryResponseHeaders() {
+ // TODO(vikesh): **REMOVE CORS FOR ALL AFTER DECIDING THE DEPLOYMENT
+ // ENVIRONMENT**
+ Headers headers = this.httpExchange.getResponseHeaders();
+ headers.add("Access-Control-Allow-Origin", "*");
+ headers.add("Content-Type", this.responseContentType);
+ }
+
+ /**
+ * Sets the given headerKey to the given headerValue.
+ *
+ * @param headerKey - Header Key
+ * @param headerValue - Header Value.
+ * @desc - For example, call like this to set the Content-disposition header
+ * setResponseHeader("Content-disposition", "attachment");
+ */
+ protected void setResponseHeader(String headerKey, String headerValue) {
+ Headers responseHeaders = this.httpExchange.getResponseHeaders();
+ responseHeaders.add(headerKey, headerValue);
+ }
+
+ /**
+ * Handle the common exceptions in processRequest.
+ *
+ * @param e thrown exception.
+ * @param illegalArgumentMessage - Message when illegal argument
+ * exception is thrown. Optional - May be null.
+ */
+ protected void handleException(Exception e, String illegalArgumentMessage) {
+ e.printStackTrace();
+ LOG.error(e);
+ if (e instanceof NumberFormatException) {
+ this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST;
+ this.response = String.format("%s must be an integer >= -1.",
+ ServerUtils.SUPERSTEP_ID_KEY);
+ } else if (e instanceof IllegalArgumentException) {
+ this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST;
+ this.response = illegalArgumentMessage;
+ } else if (e instanceof FileNotFoundException) {
+ this.statusCode = HttpURLConnection.HTTP_NOT_FOUND;
+ this.response = "File not found on the server. Please ensure this " +
+ "vertex/master was debugged.";
+ } else if (e instanceof IOException ||
+ e instanceof InstantiationException ||
+ e instanceof IllegalAccessException ||
+ e instanceof ClassNotFoundException) {
+ this.statusCode = HttpURLConnection.HTTP_INTERNAL_ERROR;
+ this.response = "Internal Server Error.";
+ } else {
+ LOG.error("Unknown Exception: " + e.toString());
+ this.statusCode = HttpURLConnection.HTTP_INTERNAL_ERROR;
+ this.response = "Unknown exception occured.";
+ }
+ }
+
+ /**
+ * Implement this method in inherited classes. This method MUST set statusCode
+ * and response (or responseBytes) class members appropriately. In case the
+ * Content type is not JSON, must specify the new Content type. Default type
+ * is application/json. Non-200 Status is automatically assigned text/plain.
+ *
+ * @param httpExchange the http exchange object within which the paramters
+ * will be set.
+ * @param paramMap map of parameters.
+ */
+ public abstract void processRequest(HttpExchange httpExchange,
+ Map<String, String> paramMap);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerUtils.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerUtils.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerUtils.java
new file mode 100644
index 0000000..d48d5ca
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerUtils.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.gui;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.giraph.debugger.utils.AggregatedValueWrapper;
+import org.apache.giraph.debugger.utils.DebuggerUtils;
+import org.apache.giraph.debugger.utils.DebuggerUtils.DebugTrace;
+import org.apache.giraph.debugger.utils.ExceptionWrapper;
+import org.apache.giraph.debugger.utils.GiraphMasterScenarioWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper.VertexContextWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper.VertexContextWrapper.NeighborWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper.VertexContextWrapper.OutgoingMessageWrapper;
+import org.apache.giraph.debugger.utils.MsgIntegrityViolationWrapper;
+import org.apache.giraph.debugger.utils.MsgIntegrityViolationWrapper.ExtendedOutgoingMessageWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Utility methods for Debugger Server.
+ */
+@SuppressWarnings("rawtypes")
+public class ServerUtils {
+ /**
+ * String for specifying the job id parameter.
+ */
+ public static final String JOB_ID_KEY = "jobId";
+ /**
+ * String for specifying the vertex id parameter.
+ */
+ public static final String VERTEX_ID_KEY = "vertexId";
+ /**
+ * String for specifying the superstep id parameter.
+ */
+ public static final String SUPERSTEP_ID_KEY = "superstepId";
+ /**
+ * String for specifying the type of integrity violation parameter.
+ */
+ public static final String INTEGRITY_VIOLATION_TYPE_KEY = "type";
+ /**
+ * String for specifying the task id.
+ */
+ public static final String TASK_ID_KEY = "taskId";
+ /**
+ * String for specifying the trace type, i.e., {@link DebugTrace}.
+ */
+ public static final String VERTEX_TEST_TRACE_TYPE_KEY = "traceType";
+ /**
+ * String for specifying the adjacency list parameter.
+ */
+ public static final String ADJLIST_KEY = "adjList";
+
+ /**
+ * Logger for this class.
+ */
+ private static final Logger LOG = Logger.getLogger(ServerUtils.class);
+
+ /**
+ * Cached FileSystem opened by {@link #getFileSystem()}.
+ */
+ private static FileSystem FILE_SYSTEM_CACHED;
+
+ /**
+ * Private constructor to disallow construction.
+ */
+ private ServerUtils() { }
+
+ /**
+ * Returns parameters of the URL in a hash map. For instance,
+ * http://localhost:9000/?key1=val1&key2=val2&key3=val3.
+ * @param rawUrl url with the parameters attached, which will be parsed.
+ * @return the parameters on the url.
+ */
+ public static Map<String, String> getUrlParams(String rawUrl)
+ throws UnsupportedEncodingException {
+ HashMap<String, String> paramMap = Maps.newHashMap();
+
+ if (rawUrl != null) {
+ String[] params = rawUrl.split("&");
+ for (String param : params) {
+ String[] parts = param.split("=");
+ String paramKey = URLDecoder.decode(parts[0], "UTF-8");
+ String paramValue = URLDecoder.decode(parts[1], "UTF-8");
+ paramMap.put(paramKey, paramValue);
+ }
+ }
+ return paramMap;
+ }
+
+ /**
+ * Returns the HDFS FileSystem reference. Note: We assume that the classpath
+ * contains the Hadoop's conf directory or the core-site.xml and hdfs-site.xml
+ * configuration directories.
+ * @return a {@link FileSystem} object to be used to read from HDFS.
+ */
+ public static FileSystem getFileSystem() throws IOException {
+ if (FILE_SYSTEM_CACHED == null) {
+ Configuration configuration = new Configuration();
+ FILE_SYSTEM_CACHED = FileSystem.get(configuration);
+ }
+ return FILE_SYSTEM_CACHED;
+ }
+
+ /**
+ * @param jobId id of the job, whose jar path will be returned.
+ * @return a url wrapped inside an array for convenience.
+ */
+ public static URL[] getCachedJobJarPath(String jobId) {
+ // read the jar signature file under the TRACE_ROOT/jobId/
+ Path jarSignaturePath = new Path(DebuggerUtils.getTraceFileRoot(jobId) +
+ "/" + "jar.signature");
+ try {
+ FileSystem fs = getFileSystem();
+ try (FSDataInputStream jarSignatureInput = fs.open(jarSignaturePath)) {
+ List<String> lines = IOUtils.readLines(jarSignatureInput);
+ if (lines.size() > 0) {
+ String jarSignature = lines.get(0);
+ // check if jar is already in JARCACHE_LOCAL
+ File localFile = new File(DebuggerUtils.JARCACHE_LOCAL + "/" +
+ jarSignature + ".jar");
+ if (!localFile.exists()) {
+ // otherwise, download from HDFS
+ Path hdfsPath = new Path(fs.getUri().resolve(
+ DebuggerUtils.JARCACHE_HDFS + "/" + jarSignature + ".jar"));
+ Logger.getLogger(ServerUtils.class).info(
+ "Copying from HDFS: " + hdfsPath + " to " + localFile);
+ localFile.getParentFile().mkdirs();
+ fs.copyToLocalFile(hdfsPath, new Path(localFile.toURI()));
+ }
+ return new URL[] { localFile.toURI().toURL() };
+ }
+ }
+ } catch (IOException e) {
+ // gracefully ignore if we failed to read the jar.signature
+ LOG.warn("An IOException is thrown but will be ignored: " +
+ e.toString());
+ }
+ return new URL[0];
+ }
+
+ /**
+ * @param jobId id of the job.
+ * @param superstepNo superstep number.
+ * @param vertexId id of the vertex.
+ * @param debugTrace must be one of VERTEX_* or INTEGRITY_VERTEX types.
+ * @return path of the vertex trace file on HDFS.
+ */
+ public static String getVertexTraceFilePath(String jobId, long superstepNo,
+ String vertexId, DebugTrace debugTrace) {
+ assert EnumSet.of(DebugTrace.VERTEX_EXCEPTION, DebugTrace.VERTEX_REGULAR,
+ DebugTrace.INTEGRITY_VERTEX).contains(debugTrace);
+ return String.format("%s/%s", DebuggerUtils.getTraceFileRoot(jobId), String
+ .format(DebuggerUtils.getTraceFileFormat(debugTrace), superstepNo,
+ vertexId));
+ }
+
+ /**
+ * @param jobId id of the job.
+ * @param taskId id of the task.
+ * @param superstepNo superstep number.
+ * @param debugTrace must be INTEGRITY_MESSAGE.
+ * @return path of the vertex trace file on HDFS.
+ */
+ public static String getIntegrityTraceFilePath(String jobId, String taskId,
+ long superstepNo, DebugTrace debugTrace) {
+ assert EnumSet.of(DebugTrace.INTEGRITY_MESSAGE_ALL).contains(debugTrace);
+ return String.format("%s/%s", DebuggerUtils.getTraceFileRoot(jobId),
+ String.format(DebuggerUtils.getTraceFileFormat(debugTrace), taskId,
+ superstepNo));
+ }
+
+ /**
+ * @param jobId id of the job.
+ * @param superstepNo superstep number.
+ * @param debugTrace must be of type MASTER_*.
+ * @return path of the master compute trace file on HDFS.
+ */
+ public static String getMasterTraceFilePath(String jobId, long superstepNo,
+ DebugTrace debugTrace) {
+ assert EnumSet.of(DebugTrace.MASTER_ALL, DebugTrace.MASTER_EXCEPTION,
+ DebugTrace.MASTER_REGULAR).contains(debugTrace);
+ return String.format("%s/%s", DebuggerUtils.getTraceFileRoot(jobId),
+ String.format(DebuggerUtils.getTraceFileFormat(debugTrace), superstepNo));
+ }
+
+ /**
+ * Reads the protocol buffer trace corresponding to the given jobId,
+ * superstepNo and vertexId and returns the giraphScenarioWrapper.
+ *
+ * @param jobId
+ * : ID of the job debugged.
+ * @param superstepNo
+ * : Superstep number debugged.
+ * @param vertexId
+ * - ID of the vertex debugged. Returns GiraphScenarioWrapper.
+ * @param debugTrace - Can be either any one of VERTEX_* and
+ * INTEGRITY_MESSAGE_SINGLE_VERTEX.
+ * @return the vertex scenario stored in the trace file represented as a
+ * {@link GiraphVertexScenarioWrapper} object.
+ */
+ public static GiraphVertexScenarioWrapper readScenarioFromTrace(String jobId,
+ long superstepNo, String vertexId, DebugTrace debugTrace)
+ throws IOException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
+ FileSystem fs = ServerUtils.getFileSystem();
+ GiraphVertexScenarioWrapper giraphScenarioWrapper =
+ new GiraphVertexScenarioWrapper();
+ EnumSet<DebugTrace> enumSet = EnumSet.of(debugTrace);
+ if (debugTrace == DebugTrace.VERTEX_ALL) {
+ enumSet = EnumSet.of(DebugTrace.VERTEX_REGULAR,
+ DebugTrace.VERTEX_EXCEPTION, DebugTrace.INTEGRITY_VERTEX,
+ DebugTrace.INTEGRITY_MESSAGE_SINGLE_VERTEX);
+ }
+ // Loops through all possible debug traces and returns the first one found.
+ for (DebugTrace enumValue : enumSet) {
+ String traceFilePath = ServerUtils.getVertexTraceFilePath(jobId,
+ superstepNo, vertexId, enumValue);
+ try {
+ // If scenario is found, return it.
+ giraphScenarioWrapper.loadFromHDFS(fs, traceFilePath,
+ getCachedJobJarPath(jobId));
+ return giraphScenarioWrapper;
+ } catch (FileNotFoundException e) {
+ // Ignore the exception since we will try reading another traceType
+ // again.
+ LOG.info("readScenarioFromTrace: File not found. Ignoring.");
+ }
+ }
+ // None of the debugTrace types were found. Throw exception.
+ throw new FileNotFoundException("Debug Trace not found.");
+ }
+
+ /**
+ * Reads the master protocol buffer trace corresponding to the given jobId and
+ * superstepNo and returns the GiraphMasterScenarioWrapper object.
+ *
+ * @param jobId
+ * : ID of the job debugged.
+ * @param superstepNo
+ * : Superstep number debugged.
+ * @param debugTrace - Can be either MASTER_REGULAR, MASTER_EXCEPTION OR
+ * MASTER_ALL. In case of MASTER_ALL, returns whichever trace is
+ * available.
+ * @return the master scenario stored in the trace file represented as a
+ * {@link GiraphMasterScenarioWrapper} object.
+ */
+ public static GiraphMasterScenarioWrapper readMasterScenarioFromTrace(
+ String jobId, long superstepNo, DebugTrace debugTrace) throws IOException,
+ ClassNotFoundException, InstantiationException, IllegalAccessException {
+ if (!EnumSet.of(DebugTrace.MASTER_ALL, DebugTrace.MASTER_EXCEPTION,
+ DebugTrace.MASTER_REGULAR).contains(debugTrace)) {
+ // Throw exception for unsupported debug trace.
+ throw new IllegalArgumentException(
+ "DebugTrace type is invalid. Use REGULAR, EXCEPTION or ALL_VERTICES");
+ }
+ FileSystem fs = ServerUtils.getFileSystem();
+ GiraphMasterScenarioWrapper giraphScenarioWrapper =
+ new GiraphMasterScenarioWrapper();
+ // For each superstep, there is either a "regular" master trace (saved in
+ // master_reg_stp_i.tr files), or an "exception" master trace (saved in
+ // master_err_stp_i.tr files). We first check to see if a regular master
+ // trace is available. If not, then we check to see if an exception master
+ // trace is available.
+ if (debugTrace == DebugTrace.MASTER_REGULAR ||
+ debugTrace == DebugTrace.MASTER_ALL) {
+ String traceFilePath = ServerUtils.getMasterTraceFilePath(jobId,
+ superstepNo, DebugTrace.MASTER_REGULAR);
+ try {
+ giraphScenarioWrapper.loadFromHDFS(fs, traceFilePath,
+ getCachedJobJarPath(jobId));
+ // If scenario is found, return it.
+ return giraphScenarioWrapper;
+ } catch (FileNotFoundException e) {
+ // If debugTrace was null, ignore this exception since
+ // we will try reading exception trace later.
+ if (debugTrace == DebugTrace.MASTER_ALL) {
+ LOG.info("readMasterScenarioFromTrace: Regular file not found. " +
+ "Ignoring.");
+ } else {
+ throw e;
+ }
+ }
+ }
+ // This code is reached only when debugTrace = exception or null.
+ // In case of null, it is only reached when regular trace is not found
+ // already.
+ String traceFilePath = ServerUtils.getMasterTraceFilePath(jobId,
+ superstepNo, DebugTrace.MASTER_EXCEPTION);
+ giraphScenarioWrapper.loadFromHDFS(fs, traceFilePath,
+ getCachedJobJarPath(jobId));
+ return giraphScenarioWrapper;
+ }
+
+ /**
+ * @param jobId id of the job.
+ * @param taskId id of the task.
+ * @param superstepNo superstep number.
+ * @return the {@linke MsgIntegrityViolationWrapper} from trace file.
+ */
+ public static MsgIntegrityViolationWrapper readMsgIntegrityViolationFromTrace(
+ String jobId, String taskId, long superstepNo) throws IOException,
+ ClassNotFoundException, InstantiationException, IllegalAccessException {
+ FileSystem fs = ServerUtils.getFileSystem();
+ String traceFilePath = ServerUtils.getIntegrityTraceFilePath(jobId, taskId,
+ superstepNo, DebugTrace.INTEGRITY_MESSAGE_ALL);
+ MsgIntegrityViolationWrapper msgIntegrityViolationWrapper =
+ new MsgIntegrityViolationWrapper();
+ msgIntegrityViolationWrapper.loadFromHDFS(fs, traceFilePath,
+ getCachedJobJarPath(jobId));
+ return msgIntegrityViolationWrapper;
+ }
+
+ /**
+ * @param jobId id of the job.
+ * @param superstepNo superstep number.
+ * @param vertexId id of the vertex.
+ * @return the vertex integrity data from the trace file stored inside
+ * {@link GiraphVertexScenarioWrapper}.
+ */
+ public static GiraphVertexScenarioWrapper
+ readVertexIntegrityViolationFromTrace(String jobId, long superstepNo,
+ String vertexId) throws IOException,
+ ClassNotFoundException, InstantiationException, IllegalAccessException {
+ FileSystem fs = ServerUtils.getFileSystem();
+ String traceFilePath = ServerUtils.getVertexTraceFilePath(jobId,
+ superstepNo, vertexId, DebugTrace.INTEGRITY_VERTEX);
+ GiraphVertexScenarioWrapper giraphScenarioWrapper =
+ new GiraphVertexScenarioWrapper();
+ giraphScenarioWrapper.loadFromHDFS(fs, traceFilePath);
+ return giraphScenarioWrapper;
+ }
+
+ /**
+ * Converts a Giraph Scenario (giraphScenarioWrapper object) to JSON
+ * (JSONObject)
+ *
+ * @param giraphScenarioWrapper Giraph Scenario object.
+ * @return scenario data stored as json.
+ */
+ public static JSONObject scenarioToJSON(
+ GiraphVertexScenarioWrapper giraphScenarioWrapper) throws JSONException {
+ VertexContextWrapper contextWrapper = giraphScenarioWrapper
+ .getContextWrapper();
+ JSONObject scenarioObj = new JSONObject();
+ scenarioObj.put("vertexId", contextWrapper.getVertexIdWrapper());
+ scenarioObj.put("vertexValue", contextWrapper.getVertexValueAfterWrapper());
+ JSONObject outgoingMessagesObj = new JSONObject();
+ JSONArray neighborsList = new JSONArray();
+ // Add outgoing messages.
+ for (Object outgoingMessage : contextWrapper.getOutgoingMessageWrappers()) {
+ OutgoingMessageWrapper outgoingMessageWrapper =
+ (OutgoingMessageWrapper) outgoingMessage;
+ outgoingMessagesObj.put(outgoingMessageWrapper.getDestinationId().
+ toString(), outgoingMessageWrapper.getMessage().toString());
+ }
+ // Add incoming messages.
+ ArrayList<String> incomingMessagesList = new ArrayList<String>();
+ for (Object incomingMessage : contextWrapper.getIncomingMessageWrappers()) {
+ incomingMessagesList.add(incomingMessage.toString());
+ }
+ // Add neighbors.
+ for (Object neighbor : contextWrapper.getNeighborWrappers()) {
+ JSONObject neighborObject = new JSONObject();
+ NeighborWrapper neighborWrapper = (NeighborWrapper) neighbor;
+ neighborObject.put("neighborId", neighborWrapper.getNbrId());
+ neighborObject.put("edgeValue", neighborWrapper.getEdgeValue());
+ neighborsList.put(neighborObject);
+ }
+ scenarioObj.put("outgoingMessages", outgoingMessagesObj);
+ scenarioObj.put("incomingMessages", incomingMessagesList);
+ scenarioObj.put("neighbors", neighborsList);
+ // Add exception, if present.
+ if (giraphScenarioWrapper.hasExceptionWrapper()) {
+ JSONObject exceptionObj = new JSONObject();
+ ExceptionWrapper exceptionWrapper = giraphScenarioWrapper
+ .getExceptionWrapper();
+ exceptionObj.put("message", exceptionWrapper.getErrorMessage());
+ exceptionObj.put("stackTrace", exceptionWrapper.getStackTrace());
+ scenarioObj.put("exception", exceptionObj);
+ }
+ JSONObject aggregateObj = new JSONObject();
+ for (Object aggregatedValue : contextWrapper
+ .getCommonVertexMasterContextWrapper().getPreviousAggregatedValues()) {
+ AggregatedValueWrapper aggregatedValueWrapper =
+ (AggregatedValueWrapper) aggregatedValue;
+ aggregateObj.put(aggregatedValueWrapper.getKey(),
+ aggregatedValueWrapper.getValue());
+ }
+ scenarioObj.put("aggregators", aggregateObj);
+ return scenarioObj;
+ }
+
+ /**
+ * Converts the message integrity violation wrapper to JSON.
+ *
+ * @param msgIntegrityViolationWrapper {@link MsgIntegrityViolationWrapper}
+ * object.
+ * @return message integrity violation data stored as json.
+ */
+ public static JSONObject msgIntegrityToJson(
+ MsgIntegrityViolationWrapper msgIntegrityViolationWrapper)
+ throws JSONException {
+ JSONObject scenarioObj = new JSONObject();
+ ArrayList<JSONObject> violationsList = new ArrayList<JSONObject>();
+ scenarioObj.put("superstepId",
+ msgIntegrityViolationWrapper.getSuperstepNo());
+ for (Object msgWrapper : msgIntegrityViolationWrapper
+ .getExtendedOutgoingMessageWrappers()) {
+ ExtendedOutgoingMessageWrapper extendedOutgoingMessageWrapper =
+ (ExtendedOutgoingMessageWrapper) msgWrapper;
+ JSONObject violationObj = new JSONObject();
+ violationObj.put("srcId", extendedOutgoingMessageWrapper.getSrcId());
+ violationObj.put("destinationId",
+ extendedOutgoingMessageWrapper.getDestinationId());
+ violationObj.put("message", extendedOutgoingMessageWrapper.getMessage());
+ violationsList.add(violationObj);
+ }
+ scenarioObj.put("violations", violationsList);
+ return scenarioObj;
+ }
+
+ /**
+ * Converts the vertex integrity violation wrapper to JSON.
+ *
+ * @param giraphVertexScenarioWrapper {@link GiraphVertexScenarioWrapper}
+ * object storing the vertex value violation data.
+ * @return vertex integrity violation data stored as json.
+ */
+ public static JSONObject vertexIntegrityToJson(
+ GiraphVertexScenarioWrapper giraphVertexScenarioWrapper)
+ throws JSONException {
+ JSONObject scenarioObj = new JSONObject();
+ VertexContextWrapper vertexContextWrapper = giraphVertexScenarioWrapper
+ .getContextWrapper();
+ scenarioObj.put("vertexId", vertexContextWrapper.getVertexIdWrapper());
+ scenarioObj.put("vertexValue",
+ vertexContextWrapper.getVertexValueAfterWrapper());
+ return scenarioObj;
+ }
+
+ /**
+ * @param jobId id of the job.
+ * @param superstepNo superstep number.
+ * @param debugTrace type of vertex trace files.
+ * @return a list of vertex Ids that were debugged in the given superstep by
+ * reading (the file names of) the debug traces on HDFS. File names follow the
+ * <prefix>_stp_<superstepNo>_vid_<vertexId>.tr naming convention.
+ */
+ public static List<String> getVerticesDebugged(String jobId,
+ long superstepNo, DebugTrace debugTrace) throws IOException {
+ ArrayList<String> vertexIds = new ArrayList<String>();
+ FileSystem fs = ServerUtils.getFileSystem();
+ String traceFileRoot = DebuggerUtils.getTraceFileRoot(jobId);
+ // Use this regex to match the file name and capture the vertex id.
+ String regex = String.format(DebuggerUtils.getTraceFileFormat(debugTrace),
+ superstepNo, "(.*?)");
+ Pattern p = Pattern.compile(regex);
+ Path pt = new Path(traceFileRoot);
+ FileStatus[] fileStatuses = null;
+ // Hadoop listStatus returns null when path is not found.
+ fileStatuses = fs.listStatus(pt);
+ if (fileStatuses == null) {
+ throw new FileNotFoundException("Debug trace file not found.");
+ }
+ // Iterate through each file in this diFilerectory and match the regex.
+ for (FileStatus fileStatus : fileStatuses) {
+ String fileName = fileStatus.getPath().getName();
+ Matcher m = p.matcher(fileName);
+ // Add this vertex id if there is a match.
+ if (m.find()) {
+ // VERTEX_ALL debug trace has one group to match the prefix -reg|err.
+ // FIXME XXX this is terrible: we pretend to know nothing about the
+ // patterns defined in DebuggerUtils#getTraceFileFormat(), but all of a
+ // sudden we're using inside knowledge to extract the vertex id part. :S
+ vertexIds.add(m.group(debugTrace == DebugTrace.VERTEX_ALL ? 2 : 1));
+ }
+ }
+ return vertexIds;
+ }
+
+ /**
+ * @param jobId id of the job.
+ * @param superstepNo superstep number.
+ * @param debugTrace must be one of INTEGRITY_* types.
+ * @return the IDs of all the tasks that caused the given integrity violation.
+ */
+ public static List<String> getTasksWithIntegrityViolations(String jobId,
+ long superstepNo, DebugTrace debugTrace) throws IOException {
+ assert EnumSet.of(DebugTrace.INTEGRITY_MESSAGE_ALL,
+ DebugTrace.INTEGRITY_VERTEX).contains(debugTrace);
+ ArrayList<String> taskIds = new ArrayList<String>();
+ FileSystem fs = ServerUtils.getFileSystem();
+ String traceFileRoot = DebuggerUtils.getTraceFileRoot(jobId);
+ // Use this regex to match the file name and capture the vertex id.
+ String regex = String.format(DebuggerUtils.getTraceFileFormat(debugTrace),
+ "(.*?)", superstepNo);
+ Pattern p = Pattern.compile(regex);
+ Path pt = new Path(traceFileRoot);
+ FileStatus[] fileStatuses = null;
+ // Hadoop listStatus returns null when path is not found.
+ fileStatuses = fs.listStatus(pt);
+ if (fileStatuses == null) {
+ throw new FileNotFoundException("Debug trace file not found.");
+ }
+ // Iterate through each file in this directory and match the regex.
+ for (FileStatus fileStatus : fileStatuses) {
+ String fileName = fileStatus.getPath().getName();
+ Matcher m = p.matcher(fileName);
+ // Add this vertex id if there is a match.
+ if (m.find()) {
+ taskIds.add(m.group(1));
+ }
+ }
+ return taskIds;
+ }
+
+ /**
+ * @param jobId id of the job.
+ * @return the list of supersteps for which there is an exception or regular
+ * trace.
+ */
+ public static List<Long> getSuperstepsDebugged(String jobId)
+ throws IOException {
+ Set<Long> superstepIds = Sets.newHashSet();
+ FileSystem fs = ServerUtils.getFileSystem();
+ String traceFileRoot = DebuggerUtils.getTraceFileRoot(jobId);
+ // Use this regex to match the file name and capture the vertex id.
+ String regex = "(reg|err|msg_intgrty|vv_intgrty)_stp_(.*?)_vid_(.*?).tr$";
+ Pattern p = Pattern.compile(regex);
+ Path pt = new Path(traceFileRoot);
+ // Iterate through each file in this directory and match the regex.
+ for (FileStatus fileStatus : fs.listStatus(pt)) {
+ String fileName = fileStatus.getPath().getName();
+ Matcher m = p.matcher(fileName);
+ // Add this vertex id if there is a match.
+ if (m.find()) {
+ superstepIds.add(Long.parseLong(m.group(2)));
+ }
+ }
+ return Lists.newArrayList(superstepIds);
+ }
+
+ /**
+ * @param jobId id of the job.
+ * @return the list of supersteps for which there is an exception or regular
+ * trace.
+ */
+ public static List<Long> getSuperstepsMasterDebugged(String jobId)
+ throws IOException {
+ Set<Long> superstepIds = Sets.newHashSet();
+ FileSystem fs = ServerUtils.getFileSystem();
+ String traceFileRoot = DebuggerUtils.getTraceFileRoot(jobId);
+ // Use this regex to match the file name and capture the vertex id.
+ String regex = "master_.*_stp_(\\d+?).tr$";
+ Pattern p = Pattern.compile(regex);
+ Path pt = new Path(traceFileRoot);
+ // Iterate through each file in this directory and match the regex.
+ for (FileStatus fileStatus : fs.listStatus(pt)) {
+ String fileName = fileStatus.getPath().getName();
+ Matcher m = p.matcher(fileName);
+ // Add this vertex id if there is a match.
+ if (m.find()) {
+ superstepIds.add(Long.parseLong(m.group(1)));
+ }
+ }
+ return Lists.newArrayList(superstepIds);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/package-info.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/package-info.java
new file mode 100644
index 0000000..c3aaec0
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes to run Giraph debugger GUI.
+ */
+package org.apache.giraph.debugger.gui;
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingComputation.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingComputation.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingComputation.java
new file mode 100644
index 0000000..b6656fa
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingComputation.java
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.instrumenter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Type;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.debugger.DebugConfig;
+import org.apache.giraph.debugger.utils.DebuggerUtils;
+import org.apache.giraph.debugger.utils.DebuggerUtils.DebugTrace;
+import org.apache.giraph.debugger.utils.ExceptionWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper.VertexContextWrapper;
+import org.apache.giraph.debugger.utils.MsgIntegrityViolationWrapper;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.AbstractComputation;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Class that intercepts call to the AbstractComputation's exposed methods for
+ * GiraphDebugger.
+ *
+ * @param <I>
+ * Vertex id
+ * @param <V>
+ * Vertex data
+ * @param <E>
+ * Edge data
+ * @param <M1>
+ * Incoming message type
+ * @param <M2>
+ * Outgoing message type
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public abstract class AbstractInterceptingComputation<
+ I extends WritableComparable, V extends Writable, E extends Writable,
+ M1 extends Writable, M2 extends Writable>
+ extends AbstractComputation<I, V, E, M1, M2> {
+
+
+ /**
+ * Configuration key for the class name of the class that extends DebugConfig.
+ */
+ public static final String CONFIG_CLASS_KEY = "giraph.debugger.configClass";
+
+ /**
+ * Giraph configuration for specifying the DebugConfig class.
+ */
+ public static final StrConfOption DEBUG_CONFIG_CLASS = new StrConfOption(
+ CONFIG_CLASS_KEY, DebugConfig.class.getName(),
+ "The name of the Debug Config class for the computation (e.g. " +
+ "org.apache.giraph.debugger.examples.SimpleShortestPathsDebugConfig).");
+
+ /**
+ * Logger for this class.
+ */
+ protected static final Logger LOG = Logger
+ .getLogger(AbstractInterceptingComputation.class);
+
+ /**
+ * A flag to indicate whether this Computation class was already initialized.
+ */
+ protected static boolean IS_INITIALIZED;
+ /**
+ * Whether DEBUG_CONFIG tells to check message constraints.
+ */
+ protected static boolean SHOULD_CHECK_MESSAGE_INTEGRITY;
+ /**
+ * Whether DEBUG_CONFIG tells to check vertex value constraints.
+ */
+ protected static boolean SHOULD_CHECK_VERTEX_VALUE_INTEGRITY;
+ /**
+ * Whether DEBUG_CONFIG tells to catch exceptions.
+ */
+ protected static boolean SHOULD_CATCH_EXCEPTIONS;
+
+ /**
+ * Configuration key for the path to the jar signature.
+ */
+ private static final String JAR_SIGNATURE_KEY =
+ "giraph.debugger.jarSignature";
+
+ /**
+ * A constant to limit the number of violations to log.
+ */
+ private static int NUM_VIOLATIONS_TO_LOG = 5;
+ /**
+ * A constant to limit the number of vertices to log.
+ */
+ private static int NUM_VERTICES_TO_LOG = 5;
+ /**
+ * A counter for number of vertices already logged.
+ */
+ private static int NUM_VERTICES_LOGGED = 0;
+ /**
+ * A counter for number of vertex violations already logged.
+ */
+ private static int NUM_VERTEX_VIOLATIONS_LOGGED = -1;
+ /**
+ * A counter for number of message violations already logged.
+ */
+ private static int NUM_MESSAGE_VIOLATIONS_LOGGED = -1;
+
+ /**
+ * DebugConfig instance to be used for debugging.
+ */
+ private static DebugConfig DEBUG_CONFIG;
+
+ /**
+ * The vertex id type as in the I of Giraph's Computation<I,V,E,M1,M2>.
+ */
+ private static Type VERTEX_ID_CLASS;
+ /**
+ * The vertex value type as in the V of Giraph's Computation<I,V,E,M1,M2>.
+ */
+ private static Type VERTEX_VALUE_CLASS;
+ /**
+ * The edge value type as in the E of Giraph's Computation<I,V,E,M1,M2>.
+ */
+ private static Type EDGE_VALUE_CLASS;
+ /**
+ * The incoming message type as in the M1 of Giraph's
+ * Computation<I,V,E,M1,M2>.
+ */
+ private static Type INCOMING_MESSAGE_CLASS;
+ /**
+ * The outgoing message type as in the M2 of Giraph's
+ * Computation<I,V,E,M1,M2>.
+ */
+ private static Type OUTGOING_MESSAGE_CLASS;
+
+ /**
+ * Contains previous aggregators that are available in the beginning of the
+ * superstep.In Giraph, these aggregators are immutable. NOTE: We currently
+ * only capture aggregators that are read by at least one vertex. If we want
+ * to capture all aggregators we need to change Giraph code to be get access
+ * to them.
+ */
+ private static CommonVertexMasterInterceptionUtil
+ COMMON_VERTEX_MASTER_INTERCEPTING_UTIL;
+
+ /**
+ * Whether or not this vertex was configured to be debugged. If so we will
+ * intercept its outgoing messages.
+ */
+ private boolean shouldDebugVertex;
+ /**
+ * Whether to stop intercepting compute() for the remaining vertices.
+ */
+ private boolean shouldStopInterceptingVertex;
+
+ /**
+ * For vertices that are configured to be debugged, we construct a
+ * GiraphVertexScenarioWrapper in the beginning and use it to intercept
+ * outgoing messages
+ */
+ private GiraphVertexScenarioWrapper<I, V, E, M1, M2>
+ giraphVertexScenarioWrapperForRegularTraces;
+
+ /**
+ * If a vertex has violated a message value constraint when it was sending a
+ * message we set this to true so that at the inside interceptComputeEnd()
+ * method we make sure we save a vertexScenario trace for it.
+ */
+ private boolean hasViolatedMsgValueConstraint;
+ /**
+ * Stores the value of a vertex before the compute method is called. If a
+ * vertex throws an exception, or violates a vertex or message value
+ * constraint, then we use this value as the previous vertex value when we
+ * save a vertexScenario trace for it.
+ */
+ private V previousVertexValue;
+ /**
+ * DataOutputBuffer for holding the previous vertex value.
+ */
+ private DataOutputBuffer previousVertexValueOutputBuffer =
+ new DataOutputBuffer();
+ /**
+ * DataInputBuffer for cloning what was preserved for previous vertex value.
+ */
+ private DataInputBuffer previousVertexValueInputBuffer =
+ new DataInputBuffer();
+ /**
+ * We keep the vertex under compute in case some functions need it, e.g.,
+ * sendMessage().
+ */
+ private Vertex<I, V, E> currentVertexUnderCompute;
+ /**
+ * The wrapped instance of message integrity violation.
+ */
+ private MsgIntegrityViolationWrapper<I, M2> msgIntegrityViolationWrapper;
+
+ /**
+ * Provides a way to access the actual Computation class.
+ * @return The actual Computation class
+ */
+ public abstract Class<? extends Computation<I, V, E, ? extends Writable,
+ ? extends Writable>> getActualTestedClass();
+
+ /**
+ * Initializes this class to start debugging.
+ */
+ protected final synchronized void
+ initializeAbstractInterceptingComputation() {
+ if (IS_INITIALIZED) {
+ return; // don't initialize twice
+ }
+ IS_INITIALIZED = true;
+ COMMON_VERTEX_MASTER_INTERCEPTING_UTIL =
+ new CommonVertexMasterInterceptionUtil(
+ getContext().getJobID().toString());
+ String debugConfigClassName = DEBUG_CONFIG_CLASS.get(getConf());
+ LOG.info("initializing debugConfigClass: " + debugConfigClassName);
+ Class<?> clazz;
+ try {
+ clazz = Class.forName(debugConfigClassName);
+ DEBUG_CONFIG = (DebugConfig<I, V, E, M1, M2>) clazz.newInstance();
+ DEBUG_CONFIG.readConfig(getConf(), getTotalNumVertices(),
+ getContext().getJobID().getId());
+ VERTEX_ID_CLASS = getConf().getVertexIdClass();
+ VERTEX_VALUE_CLASS = getConf().getVertexValueClass();
+ EDGE_VALUE_CLASS = getConf().getEdgeValueClass();
+ INCOMING_MESSAGE_CLASS = getConf().getIncomingMessageValueClass();
+ OUTGOING_MESSAGE_CLASS = getConf().getOutgoingMessageValueClass();
+ // Set limits from DebugConfig
+ NUM_VERTICES_TO_LOG = DEBUG_CONFIG.getNumberOfVerticesToLog();
+ NUM_VIOLATIONS_TO_LOG = DEBUG_CONFIG.getNumberOfViolationsToLog();
+ // Reset counters
+ NUM_MESSAGE_VIOLATIONS_LOGGED = 0;
+ NUM_VERTEX_VIOLATIONS_LOGGED = 0;
+ NUM_VERTICES_LOGGED = 0;
+ // Cache DebugConfig flags
+ SHOULD_CATCH_EXCEPTIONS = DEBUG_CONFIG.shouldCatchExceptions();
+ SHOULD_CHECK_VERTEX_VALUE_INTEGRITY =
+ DEBUG_CONFIG.shouldCheckVertexValueIntegrity();
+ SHOULD_CHECK_MESSAGE_INTEGRITY =
+ DEBUG_CONFIG.shouldCheckMessageIntegrity();
+ } catch (InstantiationException | ClassNotFoundException |
+ IllegalAccessException e) {
+ LOG.error("Could not create a new DebugConfig instance of " +
+ debugConfigClassName);
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ if (getWorkerContext().getMyWorkerIndex() == getWorkerContext()
+ .getWorkerCount() - 1) {
+ // last worker records jar signature if necessary
+ String jarSignature = getConf().get(JAR_SIGNATURE_KEY);
+ if (jarSignature != null) {
+ Path jarSignaturePath = new Path(
+ DebuggerUtils.getTraceFileRoot(COMMON_VERTEX_MASTER_INTERCEPTING_UTIL
+ .getJobId()) + "/" + "jar.signature");
+ LOG.info("Recording jar signature (" + jarSignature + ") at " +
+ jarSignaturePath);
+ FileSystem fs = COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getFileSystem();
+ try {
+ if (!fs.exists(jarSignaturePath)) {
+ OutputStream f = fs.create(jarSignaturePath,
+ true).getWrappedStream();
+ IOUtils.write(jarSignature, f);
+ f.close();
+ }
+ } catch (IOException e) {
+ // When multiple workers try to write the jar.signature, some of them
+ // may cause
+ // AlreadyBeingCreatedException to be thrown, which we ignore.
+ e.printStackTrace();
+ }
+ }
+ }
+ LOG.info("done initializing debugConfigClass: " + debugConfigClassName);
+ }
+
+ /**
+ * Keep the vertex value as the previous one.
+ *
+ * @param vertex the vertex
+ * @throws IOException
+ */
+ private void keepPreviousVertexValue(Vertex<I, V, E> vertex) throws
+ IOException {
+ previousVertexValueOutputBuffer.reset();
+ vertex.getValue().write(previousVertexValueOutputBuffer);
+ }
+
+ /**
+ * Clone the kept previous vertex value.
+ *
+ * @return Copy of previous vertex value.
+ * Same instance will be reused across multiple calls.
+ * @throws IOException
+ */
+ private V getPreviousVertexValue() throws IOException {
+ previousVertexValueInputBuffer.reset(
+ previousVertexValueOutputBuffer.getData(),
+ previousVertexValueOutputBuffer.getLength());
+ if (previousVertexValue == null) {
+ previousVertexValue = getConf().createVertexValue();
+ }
+ previousVertexValue.readFields(previousVertexValueInputBuffer);
+ return previousVertexValue;
+ }
+
+ /**
+ * @return whether captured enough number of info for debugging.
+ */
+ private boolean hasInterceptedEnough() {
+ return NUM_VERTICES_LOGGED >= NUM_VERTICES_TO_LOG ||
+ NUM_VERTEX_VIOLATIONS_LOGGED >= NUM_VIOLATIONS_TO_LOG ||
+ NUM_MESSAGE_VIOLATIONS_LOGGED >= NUM_VIOLATIONS_TO_LOG;
+ }
+
+ /**
+ * Called before {@link Computation#preSuperstep()} to prepare a message
+ * integrity violation wrapper.
+ * @return true if compute() does not need to be intercepted for this
+ * superstep.
+ */
+ protected final boolean interceptPreSuperstepBegin() {
+ // LOG.info("before preSuperstep");
+ NUM_VERTICES_LOGGED = 0;
+ NUM_VERTEX_VIOLATIONS_LOGGED = 0;
+ NUM_MESSAGE_VIOLATIONS_LOGGED = 0;
+ if (!DEBUG_CONFIG.shouldDebugSuperstep(getSuperstep()) ||
+ hasInterceptedEnough()) {
+ shouldStopInterceptingVertex = true;
+ return true;
+ }
+ if (SHOULD_CHECK_VERTEX_VALUE_INTEGRITY) {
+ LOG.info("creating a vertexValueViolationWrapper. superstepNo: " +
+ getSuperstep());
+ }
+
+ if (SHOULD_CHECK_MESSAGE_INTEGRITY) {
+ LOG.info("creating a msgIntegrityViolationWrapper. superstepNo: " +
+ getSuperstep());
+ msgIntegrityViolationWrapper = new MsgIntegrityViolationWrapper<>(
+ (Class<I>) VERTEX_ID_CLASS, (Class<M2>) OUTGOING_MESSAGE_CLASS);
+ msgIntegrityViolationWrapper.setSuperstepNo(getSuperstep());
+ }
+
+ // LOG.info("before preSuperstep done");
+ shouldStopInterceptingVertex = false;
+ return false;
+ }
+
+ /**
+ * Called immediately when the compute() method is entered. Initializes data
+ * that will be required for debugging throughout the rest of the compute
+ * function.
+ *
+ * @param vertex The vertex that's about to be computed.
+ * @param messages The incoming messages for the vertex.
+ * @throws IOException
+ */
+ protected final void interceptComputeBegin(Vertex<I, V, E> vertex,
+ Iterable<M1> messages) throws IOException {
+ if (!IS_INITIALIZED) {
+ // TODO: Sometimes Giraph doesn't call initialize() and directly calls
+ // compute(). Here we
+ // guard against things not being initiliazed, which was causing null
+ // pointer exceptions.
+ // Find out when/why this happens.
+ LOG.warn("interceptComputeBegin is called but debugConfig is null." +
+ " Initializing AbstractInterceptingComputation again...");
+ initializeAbstractInterceptingComputation();
+ }
+ // A vertex should be debugged if:
+ // 1) the user configures the superstep to be debugged;
+ // 2) the user configures the vertex to be debugged; and
+ // 3) we have already debugged less than a threshold of vertices in this
+ // superstep.
+ shouldDebugVertex = NUM_VERTICES_LOGGED < NUM_VERTICES_TO_LOG &&
+ DEBUG_CONFIG.shouldDebugVertex(vertex, getSuperstep());
+ if (shouldDebugVertex) {
+ giraphVertexScenarioWrapperForRegularTraces = getGiraphVertexScenario(
+ vertex, vertex.getValue(), messages);
+ }
+ // Keep a reference to the current vertex only when necessary.
+ if (SHOULD_CHECK_MESSAGE_INTEGRITY &&
+ NUM_MESSAGE_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG) {
+ currentVertexUnderCompute = vertex;
+ hasViolatedMsgValueConstraint = false;
+ }
+ // Keep the previous value only when necessary.
+ if (SHOULD_CATCH_EXCEPTIONS ||
+ SHOULD_CHECK_VERTEX_VALUE_INTEGRITY &&
+ NUM_VERTEX_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG ||
+ SHOULD_CHECK_MESSAGE_INTEGRITY &&
+ NUM_MESSAGE_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG) {
+ keepPreviousVertexValue(vertex);
+ }
+ }
+
+ /**
+ * Captures exception from {@link Computation#compute(Vertex, Iterable)}.
+ *
+ * @param vertex The vertex that was being computed.
+ * @param messages The incoming messages for the vertex.
+ * @param e The exception thrown.
+ * @throws IOException
+ */
+ protected final void interceptComputeException(Vertex<I, V, E> vertex,
+ Iterable<M1> messages, Throwable e) throws IOException {
+ LOG.info("Caught an exception. message: " + e.getMessage() +
+ ". Saving a trace in HDFS.");
+ GiraphVertexScenarioWrapper<I, V, E, M1, M2>
+ giraphVertexScenarioWrapperForExceptionTrace = getGiraphVertexScenario(
+ vertex, getPreviousVertexValue(), messages);
+ ExceptionWrapper exceptionWrapper = new ExceptionWrapper(e.getMessage(),
+ ExceptionUtils.getStackTrace(e));
+ giraphVertexScenarioWrapperForExceptionTrace
+ .setExceptionWrapper(exceptionWrapper);
+ COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.saveScenarioWrapper(
+ giraphVertexScenarioWrapperForExceptionTrace, DebuggerUtils
+ .getFullTraceFileName(DebugTrace.VERTEX_EXCEPTION,
+ COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getJobId(), getSuperstep(),
+ vertex.getId().toString()));
+ }
+
+ /**
+ * Called after {@link Computation#compute(Vertex, Iterable)} to check vertex
+ * and message value integrity.
+ *
+ * @param vertex The vertex that was computed.
+ * @param messages The incoming messages for the vertex.
+ * @return whether compute() needs to be intercepted more.
+ * @throws IOException
+ */
+ protected final boolean interceptComputeEnd(Vertex<I, V, E> vertex,
+ Iterable<M1> messages) throws IOException {
+ if (shouldDebugVertex) {
+ // Reflect changes made by compute to scenario.
+ giraphVertexScenarioWrapperForRegularTraces.getContextWrapper()
+ .setVertexValueAfterWrapper(vertex.getValue());
+ // Save vertex scenario.
+ COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.saveScenarioWrapper(
+ giraphVertexScenarioWrapperForRegularTraces, DebuggerUtils
+ .getFullTraceFileName(DebugTrace.VERTEX_REGULAR,
+ COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getJobId(), getSuperstep(),
+ vertex.getId().toString()));
+ NUM_VERTICES_LOGGED++;
+ }
+ if (SHOULD_CHECK_VERTEX_VALUE_INTEGRITY &&
+ NUM_VERTEX_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG &&
+ !DEBUG_CONFIG.isVertexValueCorrect(vertex.getId(), vertex.getValue())) {
+ initAndSaveGiraphVertexScenarioWrapper(vertex, messages,
+ DebugTrace.INTEGRITY_VERTEX);
+ NUM_VERTEX_VIOLATIONS_LOGGED++;
+ }
+ if (hasViolatedMsgValueConstraint) {
+ initAndSaveGiraphVertexScenarioWrapper(vertex, messages,
+ DebugTrace.INTEGRITY_MESSAGE_SINGLE_VERTEX);
+ NUM_MESSAGE_VIOLATIONS_LOGGED++;
+ }
+
+ shouldStopInterceptingVertex = hasInterceptedEnough();
+ return shouldStopInterceptingVertex;
+ }
+
+ /**
+ * Called after {@link Computation#postSuperstep()} to save the captured
+ * scenario.
+ */
+ protected final void interceptPostSuperstepEnd() {
+ // LOG.info("after postSuperstep");
+ if (SHOULD_CHECK_MESSAGE_INTEGRITY &&
+ msgIntegrityViolationWrapper.numMsgWrappers() > 0) {
+ COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.saveScenarioWrapper(
+ msgIntegrityViolationWrapper, DebuggerUtils
+ .getMessageIntegrityAllTraceFullFileName(getSuperstep(),
+ COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getJobId(), UUID.randomUUID()
+ .toString()));
+ }
+ // LOG.info("after postSuperstep done");
+ }
+
+ /**
+ * Saves the captured scenario for the given vertex.
+ *
+ * @param vertex The vertex that was computed.
+ * @param messages The incoming messages for the vertex.
+ * @param debugTrace The debug trace to save.
+ * @throws IOException
+ */
+ private void initAndSaveGiraphVertexScenarioWrapper(Vertex<I, V, E> vertex,
+ Iterable<M1> messages, DebugTrace debugTrace) throws IOException {
+ GiraphVertexScenarioWrapper<I, V, E, M1, M2>
+ giraphVertexScenarioWrapper = getGiraphVertexScenario(
+ vertex, getPreviousVertexValue(), messages);
+ COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.saveScenarioWrapper(
+ giraphVertexScenarioWrapper, DebuggerUtils.getFullTraceFileName(
+ debugTrace, COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getJobId(),
+ getSuperstep(), vertex.getId().toString()));
+ }
+
+ /**
+ * We pass the previous vertex value to assign as an argument because for some
+ * traces we capture the context lazily and store the previous value
+ * temporarily in an object. In those cases the previous value is not equal to
+ * the current value of the vertex. And sometimes it is equal to the current
+ * value.
+ *
+ * @param vertex The vertex the scenario will capture.
+ * @param previousVertexValueToAssign The previous vertex value.
+ * @param messages The incoming messages for this superstep.
+ * @return A scenario for the given vertex.
+ * @throws IOException
+ */
+ private GiraphVertexScenarioWrapper<I, V, E, M1, M2> getGiraphVertexScenario(
+ Vertex<I, V, E> vertex, V previousVertexValueToAssign,
+ Iterable<M1> messages) throws IOException {
+ GiraphVertexScenarioWrapper<I, V, E, M1, M2> giraphVertexScenarioWrapper =
+ new GiraphVertexScenarioWrapper(
+ getActualTestedClass(), (Class<I>) VERTEX_ID_CLASS,
+ (Class<V>) VERTEX_VALUE_CLASS, (Class<E>) EDGE_VALUE_CLASS,
+ (Class<M1>) INCOMING_MESSAGE_CLASS, (Class<M2>) OUTGOING_MESSAGE_CLASS);
+ VertexContextWrapper contextWrapper =
+ giraphVertexScenarioWrapper.getContextWrapper();
+ contextWrapper
+ .setVertexValueBeforeWrapper(previousVertexValueToAssign);
+ COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.initCommonVertexMasterContextWrapper(
+ getConf(), getSuperstep(), getTotalNumVertices(), getTotalNumEdges());
+ contextWrapper
+ .setCommonVertexMasterContextWrapper(
+ COMMON_VERTEX_MASTER_INTERCEPTING_UTIL
+ .getCommonVertexMasterContextWrapper());
+ giraphVertexScenarioWrapper.getContextWrapper().setVertexIdWrapper(
+ vertex.getId());
+ Iterable<Edge<I, E>> returnVal = vertex.getEdges();
+ for (Edge<I, E> edge : returnVal) {
+ giraphVertexScenarioWrapper.getContextWrapper().addNeighborWrapper(
+ edge.getTargetVertexId(), edge.getValue());
+ }
+ for (M1 message : messages) {
+ giraphVertexScenarioWrapper.getContextWrapper()
+ .addIncomingMessageWrapper(message);
+ }
+ giraphVertexScenarioWrapper.getContextWrapper().setVertexValueAfterWrapper(
+ vertex.getValue());
+ return giraphVertexScenarioWrapper;
+ }
+
+ /**
+ * First intercepts the sent message if necessary and calls and then calls
+ * AbstractComputation's sendMessage method.
+ *
+ * @param id
+ * Vertex id to send the message to
+ * @param message
+ * Message data to send
+ */
+ @Override
+ public void sendMessage(I id, M2 message) {
+ if (!shouldStopInterceptingVertex) {
+ if (shouldDebugVertex) {
+ giraphVertexScenarioWrapperForRegularTraces.getContextWrapper()
+ .addOutgoingMessageWrapper(id, message);
+ }
+ if (SHOULD_CHECK_MESSAGE_INTEGRITY &&
+ NUM_MESSAGE_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG) {
+ I senderId = currentVertexUnderCompute.getId();
+ if (!DEBUG_CONFIG.isMessageCorrect(senderId, id, message,
+ getSuperstep())) {
+ msgIntegrityViolationWrapper.addMsgWrapper(
+ currentVertexUnderCompute.getId(), id, message);
+ NUM_MESSAGE_VIOLATIONS_LOGGED++;
+ hasViolatedMsgValueConstraint = true;
+ }
+ }
+ }
+ super.sendMessage(id, message);
+ }
+
+ /**
+ * First intercepts the sent messages to all edges if necessary and calls and
+ * then calls AbstractComputation's sendMessageToAllEdges method.
+ *
+ * @param vertex
+ * Vertex whose edges to send the message to.
+ * @param message
+ * Message sent to all edges.
+ */
+ @Override
+ public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
+ if (!shouldStopInterceptingVertex) {
+ if (shouldDebugVertex) {
+ for (Edge<I, E> edge : vertex.getEdges()) {
+ giraphVertexScenarioWrapperForRegularTraces.getContextWrapper()
+ .addOutgoingMessageWrapper(edge.getTargetVertexId(), message);
+ }
+ }
+ if (SHOULD_CHECK_MESSAGE_INTEGRITY) {
+ I senderId = vertex.getId();
+ for (Edge<I, E> edge : vertex.getEdges()) {
+ if (NUM_MESSAGE_VIOLATIONS_LOGGED >= NUM_VIOLATIONS_TO_LOG) {
+ break;
+ }
+ I id = edge.getTargetVertexId();
+ if (DEBUG_CONFIG.isMessageCorrect(senderId, id, message,
+ getSuperstep())) {
+ continue;
+ }
+ msgIntegrityViolationWrapper.addMsgWrapper(senderId, id, message);
+ hasViolatedMsgValueConstraint = true;
+ NUM_MESSAGE_VIOLATIONS_LOGGED++;
+ }
+ }
+ }
+ super.sendMessageToAllEdges(vertex, message);
+ }
+
+ @Override
+ public <A extends Writable> A getAggregatedValue(String name) {
+ A retVal = super.<A>getAggregatedValue(name);
+ if (!shouldStopInterceptingVertex) {
+ COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.addAggregatedValueIfNotExists(name,
+ retVal);
+ }
+ return retVal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingMasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingMasterCompute.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingMasterCompute.java
new file mode 100644
index 0000000..d6ed908
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingMasterCompute.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.instrumenter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.giraph.debugger.utils.DebuggerUtils;
+import org.apache.giraph.debugger.utils.DebuggerUtils.DebugTrace;
+import org.apache.giraph.debugger.utils.ExceptionWrapper;
+import org.apache.giraph.debugger.utils.GiraphMasterScenarioWrapper;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+/**
+ * Class that intercepts calls to {@link MasterCompute}'s exposed methods for
+ * GiraphDebugger.
+ */
+public abstract class AbstractInterceptingMasterCompute extends MasterCompute {
+
+ /**
+ * Logger for this class.
+ */
+ protected static final Logger LOG = Logger
+ .getLogger(AbstractInterceptingMasterCompute.class);
+ /**
+ * The master scenario being captured.
+ */
+ private GiraphMasterScenarioWrapper giraphMasterScenarioWrapper;
+ /**
+ * The utility for intercepting master computes.
+ */
+ private CommonVertexMasterInterceptionUtil commonVertexMasterInterceptionUtil;
+
+ /**
+ * Called immediately as user's {@link MasterCompute#compute()} method is
+ * entered.
+ */
+ public void interceptComputeBegin() {
+ LOG.info(this.getClass().getName() + ".interceptInitializeEnd is called ");
+ giraphMasterScenarioWrapper = new GiraphMasterScenarioWrapper(this
+ .getClass().getName());
+ if (commonVertexMasterInterceptionUtil == null) {
+ commonVertexMasterInterceptionUtil = new
+ CommonVertexMasterInterceptionUtil(getContext().getJobID().toString());
+ }
+ commonVertexMasterInterceptionUtil.initCommonVertexMasterContextWrapper(
+ getConf(), getSuperstep(), getTotalNumVertices(), getTotalNumEdges());
+ giraphMasterScenarioWrapper
+ .setCommonVertexMasterContextWrapper(commonVertexMasterInterceptionUtil
+ .getCommonVertexMasterContextWrapper());
+ }
+
+ /**
+ * Intercepts the call to {@link MasterCompute#getAggregatedValue(String)} to
+ * capture aggregator values at each superstep.
+ *
+ * @param <A>
+ * The type of the aggregator value.
+ * @param name
+ * The name of the Giraph aggregator.
+ * @return The aggregator value returned by the original
+ * {@link MasterCompute#getAggregatedValue(String)}.
+ */
+ @Intercept(renameTo = "getAggregatedValue")
+ public <A extends Writable> A getAggregatedValueIntercept(String name) {
+ A retVal = super.<A>getAggregatedValue(name);
+ commonVertexMasterInterceptionUtil.addAggregatedValueIfNotExists(name,
+ retVal);
+ return retVal;
+ }
+
+ /**
+ * Called when user's {@link MasterCompute#compute()} method throws an
+ * exception.
+ *
+ * @param e
+ * exception thrown.
+ */
+ protected final void interceptComputeException(Exception e) {
+ LOG.info("Caught an exception in user's MasterCompute. message: " +
+ e.getMessage() + ". Saving a trace in HDFS.");
+ ExceptionWrapper exceptionWrapper = new ExceptionWrapper(e.getMessage(),
+ ExceptionUtils.getStackTrace(e));
+ giraphMasterScenarioWrapper.setExceptionWrapper(exceptionWrapper);
+ commonVertexMasterInterceptionUtil.saveScenarioWrapper(
+ giraphMasterScenarioWrapper, DebuggerUtils.getFullMasterTraceFileName(
+ DebugTrace.MASTER_EXCEPTION,
+ commonVertexMasterInterceptionUtil.getJobId(), getSuperstep()));
+ }
+
+ /**
+ * Called after user's {@link MasterCompute#compute()} method returns.
+ */
+ public void interceptComputeEnd() {
+ commonVertexMasterInterceptionUtil.saveScenarioWrapper(
+ giraphMasterScenarioWrapper, DebuggerUtils.getFullMasterTraceFileName(
+ DebugTrace.MASTER_REGULAR,
+ commonVertexMasterInterceptionUtil.getJobId(), getSuperstep()));
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ }
+}