You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2014/12/08 20:21:41 UTC

[6/9] git commit: updated refs/heads/trunk to 8675c84

http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/Server.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/Server.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/Server.java
new file mode 100644
index 0000000..85b72bd
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/Server.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.gui;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.giraph.debugger.mock.ComputationComputeTestGenerator;
+import org.apache.giraph.debugger.mock.MasterComputeTestGenerator;
+import org.apache.giraph.debugger.mock.TestGraphGenerator;
+import org.apache.giraph.debugger.utils.DebuggerUtils;
+import org.apache.giraph.debugger.utils.DebuggerUtils.DebugTrace;
+import org.apache.giraph.debugger.utils.GiraphMasterScenarioWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper;
+import org.apache.giraph.debugger.utils.MsgIntegrityViolationWrapper;
+import org.apache.log4j.Logger;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+
+/**
+ * Entry point to the HTTP Debugger Server.
+ */
+public class Server {
+
+  /**
+   * Logger for the class.
+   */
+  private static final Logger LOG = Logger.getLogger(Server.class);
+  /**
+   * Default port number for the server.
+   */
+  private static final int SERVER_PORT = Integer.parseInt(System.getProperty(
+    "giraph.debugger.guiPort", "8000"));
+
+  /**
+   * Private constructor to disallow construction outside of the class.
+   */
+  private Server() { }
+
+  /**
+   * @param args command line arguments for the server
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    HttpServer server = HttpServer
+      .create(new InetSocketAddress(SERVER_PORT), 0);
+    // Attach JobHandler instance to handle /job GET call.
+    server.createContext("/vertices", new GetVertices());
+    server.createContext("/supersteps", new GetSupersteps());
+    server.createContext("/scenario", new GetScenario());
+    server.createContext("/integrity", new GetIntegrity());
+    server.createContext("/test/vertex", new GetVertexTest());
+    server.createContext("/test/master", new GetMasterTest());
+    server.createContext("/test/graph", new GetTestGraph());
+    server.createContext("/", new GetEditor());
+    // Creates a default executor.
+    server.setExecutor(null);
+    server.start();
+  }
+
+  /**
+   * Handler when accessing the landing page for the server.
+   */
+  static class GetEditor implements HttpHandler {
+
+    @Override
+    public void handle(HttpExchange t) {
+      URI uri = t.getRequestURI();
+      try {
+        try {
+          String path = uri.getPath();
+          LOG.debug(path);
+          if (path.endsWith("/")) {
+            path += "index.html";
+          }
+          path = path.replaceFirst("^/", "");
+          LOG.debug("resource path to look for = " + path);
+          LOG.debug("resource URL = " + getClass().getResource(path));
+          InputStream fs = getClass().getResourceAsStream(path);
+          if (fs == null) {
+            // Object does not exist or is not a file: reject
+            // with 404 error.
+            String response = "404 (Not Found)\n";
+            t.sendResponseHeaders(404, response.length());
+            OutputStream os = t.getResponseBody();
+            os.write(response.getBytes());
+            os.close();
+          } else {
+            // Object exists and is a file: accept with response
+            // code 200.
+            t.sendResponseHeaders(200, 0);
+            OutputStream os = t.getResponseBody();
+            final byte[] buffer = new byte[0x10000];
+            int count = 0;
+            while ((count = fs.read(buffer)) >= 0) {
+              os.write(buffer, 0, count);
+            }
+            fs.close();
+            os.close();
+          }
+        } catch (IOException e) {
+          e.printStackTrace();
+          t.sendResponseHeaders(404, 0);
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * Returns the list of vertices debugged in a given Superstep for a given job.
+   *
+   * URL parameters: {jobId, superstepId}
+   */
+  static class GetVertices extends ServerHttpHandler {
+    @Override
+    public void processRequest(HttpExchange httpExchange,
+      Map<String, String> paramMap) {
+      String jobId = paramMap.get(ServerUtils.JOB_ID_KEY);
+      String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY);
+      // CHECKSTYLE: stop IllegalCatch
+      try {
+        // jobId and superstepId are mandatory. Validate.
+        if (jobId == null || superstepId == null) {
+          throw new IllegalArgumentException("Missing mandatory params.");
+        }
+        List<String> vertexIds = null;
+        // May throw NumberFormatException. Handled below.
+        long superstepNo = Long.parseLong(superstepId);
+        if (superstepNo < -1) {
+          throw new NumberFormatException("Superstep must be integer >= -1.");
+        }
+        // May throw IOException. Handled below.
+        vertexIds = ServerUtils.getVerticesDebugged(jobId, superstepNo,
+          DebugTrace.VERTEX_ALL);
+        this.statusCode = HttpURLConnection.HTTP_OK;
+        // Returns output as an array ["id1", "id2", "id3" .... ]
+        this.response = new JSONArray(vertexIds).toString();
+      } catch (Exception e) {
+        this.handleException(e, String.format(
+          "Invalid parameters. %s is a mandatory parameter.",
+          ServerUtils.JOB_ID_KEY));
+      }
+      // CHECKSTYLE: resume IllegalCatch
+    }
+  }
+
+  /**
+   * Returns the number of supersteps traced for the given job.
+   */
+  static class GetSupersteps extends ServerHttpHandler {
+    @Override
+    public void processRequest(HttpExchange httpExchange,
+      Map<String, String> paramMap) {
+      String jobId = paramMap.get(ServerUtils.JOB_ID_KEY);
+      // CHECKSTYLE: stop IllegalCatch
+      try {
+        // jobId and superstepId are mandatory. Validate.
+        if (jobId == null) {
+          throw new IllegalArgumentException("Missing mandatory params.");
+        }
+        List<Long> superstepIds = null;
+        // May throw IOException. Handled below.
+        superstepIds = ServerUtils.getSuperstepsDebugged(jobId);
+        this.statusCode = HttpURLConnection.HTTP_OK;
+        // Returns output as an array ["id1", "id2", "id3" .... ]
+        this.response = new JSONArray(superstepIds).toString();
+      } catch (Exception e) {
+        this.handleException(e, String.format(
+          "Invalid parameters. %s and %s are mandatory parameter.",
+          ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY));
+      }
+      // CHECKSTYLE: resume IllegalCatch
+    }
+  }
+
+  /**
+   * Returns the scenario for a given superstep of a given job.
+   *
+   * URL Params: {jobId, superstepId, [vertexId], [raw]}
+   * vertexId: vertexId is optional. It can be a single value or a comma
+   *       separated list. If it is not supplied, returns the scenario for all
+   *       vertices. If 'raw' parameter is specified, returns the raw protocol
+   *       buffer.
+   */
+  static class GetScenario extends ServerHttpHandler {
+    @Override
+    @SuppressWarnings("rawtypes")
+    public void processRequest(HttpExchange httpExchange,
+      Map<String, String> paramMap) {
+      String jobId = paramMap.get(ServerUtils.JOB_ID_KEY);
+      String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY);
+      // Check both jobId and superstepId are present
+      // CHECKSTYLE: stop IllegalCatch
+      try {
+        if (jobId == null || superstepId == null) {
+          throw new IllegalArgumentException("Missing mandatory parameters");
+        }
+        Long superstepNo = Long.parseLong(paramMap
+          .get(ServerUtils.SUPERSTEP_ID_KEY));
+        if (superstepNo < -1) {
+          this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST;
+          this.response = String.format("%s must be an integer >= -1.",
+            ServerUtils.SUPERSTEP_ID_KEY);
+          return;
+        }
+        List<String> vertexIds = null;
+        // Get the single vertexId or the list of vertexIds (comma-separated).
+        String rawVertexIds = paramMap.get(ServerUtils.VERTEX_ID_KEY);
+        // No vertex Id supplied. Return scenario for all vertices.
+        if (rawVertexIds == null) {
+          // Read scenario for all vertices.
+          // May throw IOException. Handled below.
+          vertexIds = ServerUtils.getVerticesDebugged(jobId, superstepNo,
+            DebugTrace.VERTEX_ALL);
+        } else {
+          // Split the vertices by comma.
+          vertexIds = Lists.newArrayList(rawVertexIds.split(","));
+        }
+        // Send JSON by default.
+        JSONObject scenarioObj = new JSONObject();
+        for (String vertexId : vertexIds) {
+          GiraphVertexScenarioWrapper giraphScenarioWrapper;
+          giraphScenarioWrapper = ServerUtils.readScenarioFromTrace(jobId,
+            superstepNo, vertexId.trim(), DebugTrace.VERTEX_REGULAR);
+          scenarioObj.put(vertexId,
+            ServerUtils.scenarioToJSON(giraphScenarioWrapper));
+        }
+        // Set status as OK and convert JSONObject to string.
+        this.statusCode = HttpURLConnection.HTTP_OK;
+        this.response = scenarioObj.toString();
+      } catch (Exception e) {
+        this.handleException(e, String.format(
+          "Invalid parameters. %s and %s are mandatory parameter.",
+          ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY));
+      }
+      // CHECKSTYLE: stop IllegalCatch
+    }
+  }
+
+  /**
+   * Returns the JAVA code for vertex scenario.
+   *
+   * URL Params: {jobId, superstepId, vertexId, traceType}
+   * traceType: Can be one of reg, err, msg or vv
+   */
+  static class GetVertexTest extends ServerHttpHandler {
+    @Override
+    @SuppressWarnings("rawtypes")
+    public void processRequest(HttpExchange httpExchange,
+      Map<String, String> paramMap) {
+      String jobId = paramMap.get(ServerUtils.JOB_ID_KEY);
+      String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY);
+      String vertexId = paramMap.get(ServerUtils.VERTEX_ID_KEY);
+      String traceType = paramMap.get(ServerUtils.VERTEX_TEST_TRACE_TYPE_KEY);
+      // Check both jobId, superstepId and vertexId are present
+      try {
+        if (jobId == null || superstepId == null || vertexId == null ||
+          traceType == null) {
+          throw new IllegalArgumentException("Missing mandatory parameters");
+        }
+        Long superstepNo = Long.parseLong(paramMap
+          .get(ServerUtils.SUPERSTEP_ID_KEY));
+        if (superstepNo < -1) {
+          throw new NumberFormatException();
+        }
+        DebugTrace debugTrace = DebuggerUtils
+          .getVertexDebugTraceForPrefix(traceType);
+        // Send JSON by default.
+        GiraphVertexScenarioWrapper giraphScenarioWrapper = ServerUtils
+          .readScenarioFromTrace(jobId, superstepNo, vertexId.trim(),
+            debugTrace);
+        ComputationComputeTestGenerator testGenerator =
+          new ComputationComputeTestGenerator();
+        String testClassName = String.format("%sTest_%s_S%s_V%s",
+          giraphScenarioWrapper.getVertexScenarioClassesWrapper()
+            .getClassUnderTest().getSimpleName(), jobId, superstepId, vertexId);
+        // Set the content-disposition header to force a download with the
+        // given filename.
+        String filename = String.format("%s.java", testClassName);
+        this.setResponseHeader("Content-Disposition",
+          String.format("attachment; filename=\"%s\"", filename));
+        this.statusCode = HttpURLConnection.HTTP_OK;
+        this.responseContentType = MediaType.TEXT_PLAIN;
+        this.response = testGenerator
+          .generateTest(giraphScenarioWrapper,
+            null /* testPackage is optional */, testClassName);
+      } catch (Exception e) {
+        this.handleException(e, String.format(
+          "Invalid parameters. %s, %s and %s are mandatory parameter.",
+          ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY,
+          ServerUtils.VERTEX_ID_KEY));
+      }
+    }
+  }
+
+  /**
+   * Returns the JAVA code for master scenario.
+   *
+   * @URLParams : {jobId, superstepId}
+   */
+  static class GetMasterTest extends ServerHttpHandler {
+    @Override
+    public void processRequest(HttpExchange httpExchange,
+      Map<String, String> paramMap) {
+      String jobId = paramMap.get(ServerUtils.JOB_ID_KEY);
+      String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY);
+      // Check both jobId, superstepId and vertexId are present
+      try {
+        if (jobId == null || superstepId == null) {
+          throw new IllegalArgumentException("Missing mandatory parameters");
+        }
+        Long superstepNo = Long.parseLong(paramMap
+          .get(ServerUtils.SUPERSTEP_ID_KEY));
+        if (superstepNo < -1) {
+          this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST;
+          this.response = String.format("%s must be an integer >= -1.",
+            ServerUtils.SUPERSTEP_ID_KEY);
+          return;
+        }
+        // Send JSON by default.
+        GiraphMasterScenarioWrapper giraphScenarioWrapper = ServerUtils
+          .readMasterScenarioFromTrace(jobId, superstepNo,
+            DebugTrace.MASTER_ALL);
+        MasterComputeTestGenerator masterTestGenerator =
+          new MasterComputeTestGenerator();
+        // Set the content-disposition header to force a download with the
+        // given filename.
+        String testClassName = String.format("%sTest_%s_S%s",
+          giraphScenarioWrapper.getMasterClassUnderTest()
+            .replaceFirst(".*\\.", ""), jobId, superstepId);
+        String filename = String.format("%s.java", testClassName);
+        this.setResponseHeader("Content-Disposition",
+          String.format("attachment; filename=\"%s\"", filename));
+        this.statusCode = HttpURLConnection.HTTP_OK;
+        this.responseContentType = MediaType.TEXT_PLAIN;
+        this.response = masterTestGenerator.generateTest(giraphScenarioWrapper,
+          null /* testPackage is optional */, testClassName);
+      } catch (Exception e) {
+        this.handleException(e, String.format(
+          "Invalid parameters. %s and %s are mandatory parameter.",
+          ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY));
+      }
+    }
+  }
+
+  /**
+   * Returns the integrity violations based on the requested parameter. The
+   * requested parameter (type) may be one of M, E or V.
+   *
+   * URL Params: jobId, superstepId, violiationType It is an optional parameter
+   *            and is only used when violationType = V
+   */
+  static class GetIntegrity extends ServerHttpHandler {
+    /**
+     * The server returns only a limited number of msg or vertex value
+     * violations. For message violations, it may not put the limit at exactly
+     * this number because it reads each violation trace which may include
+     * multiple message violations and adds all the violations in the trace to
+     * the response. Once the total message violations is over this number it
+     * stops reading traces.
+     */
+    private static final int NUM_VIOLATIONS_THRESHOLD = 50;
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public void processRequest(HttpExchange httpExchange,
+      Map<String, String> paramMap) {
+      String jobId = paramMap.get(ServerUtils.JOB_ID_KEY);
+      String superstepId = paramMap.get(ServerUtils.SUPERSTEP_ID_KEY);
+      String violationType = paramMap
+        .get(ServerUtils.INTEGRITY_VIOLATION_TYPE_KEY);
+      try {
+        if (jobId == null || superstepId == null || violationType == null) {
+          throw new IllegalArgumentException("Missing mandatory parameters");
+        }
+        Long superstepNo = Long.parseLong(paramMap
+          .get(ServerUtils.SUPERSTEP_ID_KEY));
+        if (superstepNo < -1) {
+          throw new NumberFormatException();
+        }
+        // JSON object that will be finally returned.
+        JSONObject integrityObj = new JSONObject();
+        // Message violation
+        if (violationType.equals("M")) {
+          List<String> taskIds = ServerUtils.getTasksWithIntegrityViolations(
+            jobId, superstepNo, DebugTrace.INTEGRITY_MESSAGE_ALL);
+
+          int numViolations = 0;
+          for (String taskId : taskIds) {
+            MsgIntegrityViolationWrapper msgIntegrityViolationWrapper =
+              ServerUtils.readMsgIntegrityViolationFromTrace(jobId, taskId,
+                superstepNo);
+            integrityObj.put(taskId,
+              ServerUtils.msgIntegrityToJson(msgIntegrityViolationWrapper));
+            numViolations += msgIntegrityViolationWrapper.numMsgWrappers();
+            if (numViolations >= NUM_VIOLATIONS_THRESHOLD) {
+              break;
+            }
+          }
+          this.response = integrityObj.toString();
+          this.statusCode = HttpURLConnection.HTTP_OK;
+        } else if (violationType.equals("V")) {
+          List<String> vertexIds = ServerUtils.getVerticesDebugged(jobId,
+            superstepNo, DebugTrace.INTEGRITY_VERTEX);
+          int numViolations = 0;
+          for (String vertexId : vertexIds) {
+            GiraphVertexScenarioWrapper giraphVertexScenarioWrapper =
+              ServerUtils.readVertexIntegrityViolationFromTrace(jobId,
+                superstepNo, vertexId);
+            numViolations++;
+            integrityObj.put(vertexId,
+              ServerUtils.vertexIntegrityToJson(giraphVertexScenarioWrapper));
+            if (numViolations >= NUM_VIOLATIONS_THRESHOLD) {
+              break;
+            }
+          }
+          this.response = integrityObj.toString();
+          this.statusCode = HttpURLConnection.HTTP_OK;
+        } else if (violationType.equals("E")) {
+          List<String> vertexIds = null;
+          // Get the single vertexId or the list of vertexIds (comma-separated).
+          String rawVertexIds = paramMap.get(ServerUtils.VERTEX_ID_KEY);
+          // No vertex Id supplied. Return exceptions for all vertices.
+          if (rawVertexIds == null) {
+            // Read exceptions for all vertices.
+            vertexIds = ServerUtils.getVerticesDebugged(jobId, superstepNo,
+              DebugTrace.VERTEX_EXCEPTION);
+          } else {
+            // Split the vertices by comma.
+            vertexIds = Lists.newArrayList(rawVertexIds.split(","));
+          }
+          // Send JSON by default.
+          JSONObject scenarioObj = new JSONObject();
+          for (String vertexId : vertexIds) {
+            GiraphVertexScenarioWrapper giraphScenarioWrapper;
+            giraphScenarioWrapper = ServerUtils.readScenarioFromTrace(jobId,
+              superstepNo, vertexId.trim(), DebugTrace.VERTEX_EXCEPTION);
+            scenarioObj.put(vertexId,
+              ServerUtils.scenarioToJSON(giraphScenarioWrapper));
+          }
+          // Set status as OK and convert JSONObject to string.
+          this.statusCode = HttpURLConnection.HTTP_OK;
+          this.response = scenarioObj.toString();
+        }
+      } catch (Exception e) {
+        this.handleException(e, String.format(
+          "Invalid parameters. %s, %s and %s are mandatory parameter.",
+          ServerUtils.JOB_ID_KEY, ServerUtils.SUPERSTEP_ID_KEY,
+          ServerUtils.INTEGRITY_VIOLATION_TYPE_KEY));
+      }
+    }
+  }
+
+  /**
+   * Returns the TestGraph JAVA code.
+   *
+   * URL Param: adjList - Adjacency list of the graph
+   */
+  static class GetTestGraph extends ServerHttpHandler {
+    @Override
+    public void processRequest(HttpExchange httpExchange,
+      Map<String, String> paramMap) {
+      String adjList = paramMap.get(ServerUtils.ADJLIST_KEY);
+      // Check both jobId and superstepId are present
+      try {
+        if (adjList == null) {
+          throw new IllegalArgumentException("Missing mandatory parameters");
+        }
+        TestGraphGenerator testGraphGenerator = new TestGraphGenerator();
+        String testGraph = testGraphGenerator.generate(adjList.split("\n"));
+        this.setResponseHeader("Content-Disposition",
+          "attachment; filename=graph.java");
+        this.statusCode = HttpURLConnection.HTTP_OK;
+        this.responseContentType = MediaType.TEXT_PLAIN;
+        this.response = testGraph;
+      } catch (Exception e) {
+        this.handleException(e, String.format(
+          "Invalid parameters. %s is mandatory parameter.",
+          ServerUtils.ADJLIST_KEY));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerHttpHandler.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerHttpHandler.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerHttpHandler.java
new file mode 100644
index 0000000..50ebb23
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerHttpHandler.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.gui;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.util.Map;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.log4j.Logger;
+
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+
+/**
+ * The Abstract class for HTTP handlers.
+ */
+public abstract class ServerHttpHandler implements HttpHandler {
+
+  /**
+   * Logger for this class.
+   */
+  private static final Logger LOG = Logger.getLogger(ServerHttpHandler.class);
+  /**
+   * Response body.
+   */
+  protected String response;
+  /**
+   * Response body as a byte array
+   */
+  protected byte[] responseBytes;
+  /**
+   * Response status code. Please use HttpUrlConnection final static members.
+   */
+  protected int statusCode;
+  /**
+   * MimeType of the response. Please use MediaType final static members.
+   */
+  protected String responseContentType;
+  /**
+   * HttpExchange object received in the handle call.
+   */
+  protected HttpExchange httpExchange;
+
+  /**
+   * Handles an HTTP call's lifecycle - read parameters, process and send
+   * response.
+   * @param httpExchange the http exchange object.
+   */
+  @Override
+  public void handle(HttpExchange httpExchange) throws IOException {
+    // Assign class members so that subsequent methods can use it.
+    this.httpExchange = httpExchange;
+    // Set application/json as the default content type.
+    this.responseContentType = MediaType.APPLICATION_JSON;
+    String rawUrl = httpExchange.getRequestURI().getQuery();
+    Map<String, String> paramMap;
+    try {
+      paramMap = ServerUtils.getUrlParams(rawUrl);
+      // Call the method implemented by inherited classes.
+      LOG.info(httpExchange.getRequestURI().getPath() + paramMap.toString());
+      processRequest(httpExchange, paramMap);
+    } catch (UnsupportedEncodingException ex) {
+      this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST;
+      this.response = "Malformed URL. Given encoding is not supported.";
+    }
+    // In case of an error statusCode, we just write the exception string.
+    // (Consider using JSON).
+    if (this.statusCode != HttpURLConnection.HTTP_OK) {
+      this.responseContentType = MediaType.TEXT_PLAIN;
+    }
+    // Set mandatory Response Headers.
+    this.setMandatoryResponseHeaders();
+    this.writeResponse();
+  }
+
+  /**
+   * Writes the text response.
+   */
+  private void writeResponse() throws IOException {
+    OutputStream os = this.httpExchange.getResponseBody();
+    if (this.responseContentType == MediaType.APPLICATION_JSON ||
+      this.responseContentType == MediaType.TEXT_PLAIN) {
+      this.httpExchange.sendResponseHeaders(this.statusCode,
+        this.response.length());
+      os.write(this.response.getBytes());
+    } else if (this.responseContentType == MediaType.APPLICATION_OCTET_STREAM) {
+      this.httpExchange.sendResponseHeaders(this.statusCode,
+        this.responseBytes.length);
+      os.write(this.responseBytes);
+    }
+    os.close();
+  }
+
+  /**
+   * Add mandatory headers to the HTTP response by the debugger server. MUST be
+   * called before sendResponseHeaders.
+   */
+  private void setMandatoryResponseHeaders() {
+    // TODO(vikesh): **REMOVE CORS FOR ALL AFTER DECIDING THE DEPLOYMENT
+    // ENVIRONMENT**
+    Headers headers = this.httpExchange.getResponseHeaders();
+    headers.add("Access-Control-Allow-Origin", "*");
+    headers.add("Content-Type", this.responseContentType);
+  }
+
+  /**
+   * Sets the given headerKey to the given headerValue.
+   *
+   * @param headerKey - Header Key
+   * @param headerValue - Header Value.
+   * @desc - For example, call like this to set the Content-disposition header
+   *       setResponseHeader("Content-disposition", "attachment");
+   */
+  protected void setResponseHeader(String headerKey, String headerValue) {
+    Headers responseHeaders = this.httpExchange.getResponseHeaders();
+    responseHeaders.add(headerKey, headerValue);
+  }
+
+  /**
+   * Handle the common exceptions in processRequest.
+   *
+   * @param e thrown exception.
+   * @param illegalArgumentMessage - Message when illegal argument
+   *         exception is thrown. Optional - May be null.
+   */
+  protected void handleException(Exception e, String illegalArgumentMessage) {
+    e.printStackTrace();
+    LOG.error(e);
+    if (e instanceof NumberFormatException) {
+      this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST;
+      this.response = String.format("%s must be an integer >= -1.",
+        ServerUtils.SUPERSTEP_ID_KEY);
+    } else if (e instanceof IllegalArgumentException) {
+      this.statusCode = HttpURLConnection.HTTP_BAD_REQUEST;
+      this.response = illegalArgumentMessage;
+    } else if (e instanceof FileNotFoundException) {
+      this.statusCode = HttpURLConnection.HTTP_NOT_FOUND;
+      this.response = "File not found on the server. Please ensure this " +
+        "vertex/master was debugged.";
+    } else if (e instanceof IOException ||
+      e instanceof InstantiationException ||
+      e instanceof IllegalAccessException ||
+      e instanceof ClassNotFoundException) {
+      this.statusCode = HttpURLConnection.HTTP_INTERNAL_ERROR;
+      this.response = "Internal Server Error.";
+    } else {
+      LOG.error("Unknown Exception: " + e.toString());
+      this.statusCode = HttpURLConnection.HTTP_INTERNAL_ERROR;
+      this.response = "Unknown exception occured.";
+    }
+  }
+
+  /**
+   * Implement this method in inherited classes. This method MUST set statusCode
+   * and response (or responseBytes) class members appropriately. In case the
+   * Content type is not JSON, must specify the new Content type. Default type
+   * is application/json. Non-200 Status is automatically assigned text/plain.
+   *
+   * @param httpExchange the http exchange object within which the paramters
+   *                     will be set.
+   * @param paramMap map of parameters.
+   */
+  public abstract void processRequest(HttpExchange httpExchange,
+    Map<String, String> paramMap);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerUtils.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerUtils.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerUtils.java
new file mode 100644
index 0000000..d48d5ca
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/ServerUtils.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.gui;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.giraph.debugger.utils.AggregatedValueWrapper;
+import org.apache.giraph.debugger.utils.DebuggerUtils;
+import org.apache.giraph.debugger.utils.DebuggerUtils.DebugTrace;
+import org.apache.giraph.debugger.utils.ExceptionWrapper;
+import org.apache.giraph.debugger.utils.GiraphMasterScenarioWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper.VertexContextWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper.VertexContextWrapper.NeighborWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper.VertexContextWrapper.OutgoingMessageWrapper;
+import org.apache.giraph.debugger.utils.MsgIntegrityViolationWrapper;
+import org.apache.giraph.debugger.utils.MsgIntegrityViolationWrapper.ExtendedOutgoingMessageWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Utility methods for Debugger Server.
+ */
+@SuppressWarnings("rawtypes")
+public class ServerUtils {
+  /**
+   * String for specifying the job id parameter.
+   */
+  public static final String JOB_ID_KEY = "jobId";
+  /**
+   * String for specifying the vertex id parameter.
+   */
+  public static final String VERTEX_ID_KEY = "vertexId";
+  /**
+   * String for specifying the superstep id parameter.
+   */
+  public static final String SUPERSTEP_ID_KEY = "superstepId";
+  /**
+   * String for specifying the type of integrity violation parameter.
+   */
+  public static final String INTEGRITY_VIOLATION_TYPE_KEY = "type";
+  /**
+   * String for specifying the task id.
+   */
+  public static final String TASK_ID_KEY = "taskId";
+  /**
+   * String for specifying the trace type, i.e., {@link DebugTrace}.
+   */
+  public static final String VERTEX_TEST_TRACE_TYPE_KEY = "traceType";
+  /**
+   * String for specifying the adjacency list parameter.
+   */
+  public static final String ADJLIST_KEY = "adjList";
+
+  /**
+   * Logger for this class.
+   */
+  private static final Logger LOG = Logger.getLogger(ServerUtils.class);
+
+  /**
+   * Cached FileSystem opened by {@link #getFileSystem()}.
+   */
+  private static FileSystem FILE_SYSTEM_CACHED;
+
+  /**
+   * Private constructor to disallow construction.
+   */
+  private ServerUtils() { }
+
+  /**
+   * Returns parameters of the URL in a hash map. For instance,
+   * http://localhost:9000/?key1=val1&key2=val2&key3=val3.
+   * @param rawUrl url with the parameters attached, which will be parsed.
+   * @return the parameters on the url.
+   */
+  public static Map<String, String> getUrlParams(String rawUrl)
+    throws UnsupportedEncodingException {
+    HashMap<String, String> paramMap = Maps.newHashMap();
+
+    if (rawUrl != null) {
+      String[] params = rawUrl.split("&");
+      for (String param : params) {
+        String[] parts = param.split("=");
+        String paramKey = URLDecoder.decode(parts[0], "UTF-8");
+        String paramValue = URLDecoder.decode(parts[1], "UTF-8");
+        paramMap.put(paramKey, paramValue);
+      }
+    }
+    return paramMap;
+  }
+
+  /**
+   * Returns the HDFS FileSystem reference. Note: We assume that the classpath
+   * contains the Hadoop's conf directory or the core-site.xml and hdfs-site.xml
+   * configuration directories.
+   * @return a {@link FileSystem} object to be used to read from HDFS.
+   */
+  public static FileSystem getFileSystem() throws IOException {
+    if (FILE_SYSTEM_CACHED == null) {
+      Configuration configuration = new Configuration();
+      FILE_SYSTEM_CACHED = FileSystem.get(configuration);
+    }
+    return FILE_SYSTEM_CACHED;
+  }
+
+  /**
+   * @param jobId id of the job, whose jar path will be returned.
+   * @return a url wrapped inside an array for convenience.
+   */
+  public static URL[] getCachedJobJarPath(String jobId) {
+    // read the jar signature file under the TRACE_ROOT/jobId/
+    Path jarSignaturePath = new Path(DebuggerUtils.getTraceFileRoot(jobId) +
+      "/" + "jar.signature");
+    try {
+      FileSystem fs = getFileSystem();
+      try (FSDataInputStream jarSignatureInput = fs.open(jarSignaturePath)) {
+        List<String> lines = IOUtils.readLines(jarSignatureInput);
+        if (lines.size() > 0) {
+          String jarSignature = lines.get(0);
+          // check if jar is already in JARCACHE_LOCAL
+          File localFile = new File(DebuggerUtils.JARCACHE_LOCAL + "/" +
+            jarSignature + ".jar");
+          if (!localFile.exists()) {
+            // otherwise, download from HDFS
+            Path hdfsPath = new Path(fs.getUri().resolve(
+              DebuggerUtils.JARCACHE_HDFS + "/" + jarSignature + ".jar"));
+            Logger.getLogger(ServerUtils.class).info(
+              "Copying from HDFS: " + hdfsPath + " to " + localFile);
+            localFile.getParentFile().mkdirs();
+            fs.copyToLocalFile(hdfsPath, new Path(localFile.toURI()));
+          }
+          return new URL[] { localFile.toURI().toURL() };
+        }
+      }
+    } catch (IOException e) {
+      // gracefully ignore if we failed to read the jar.signature
+      LOG.warn("An IOException is thrown but will be ignored: " +
+        e.toString());
+    }
+    return new URL[0];
+  }
+
+  /**
+   * @param jobId id of the job.
+   * @param superstepNo superstep number.
+   * @param vertexId id of the vertex.
+   * @param debugTrace must be one of VERTEX_* or INTEGRITY_VERTEX types.
+   * @return path of the vertex trace file on HDFS.
+   */
+  public static String getVertexTraceFilePath(String jobId, long superstepNo,
+    String vertexId, DebugTrace debugTrace) {
+    assert EnumSet.of(DebugTrace.VERTEX_EXCEPTION, DebugTrace.VERTEX_REGULAR,
+      DebugTrace.INTEGRITY_VERTEX).contains(debugTrace);
+    return String.format("%s/%s", DebuggerUtils.getTraceFileRoot(jobId), String
+      .format(DebuggerUtils.getTraceFileFormat(debugTrace), superstepNo,
+        vertexId));
+  }
+
+  /**
+   * @param jobId id of the job.
+   * @param taskId id of the task.
+   * @param superstepNo superstep number.
+   * @param debugTrace must be INTEGRITY_MESSAGE.
+   * @return path of the vertex trace file on HDFS.
+   */
+  public static String getIntegrityTraceFilePath(String jobId, String taskId,
+    long superstepNo, DebugTrace debugTrace) {
+    assert EnumSet.of(DebugTrace.INTEGRITY_MESSAGE_ALL).contains(debugTrace);
+    return String.format("%s/%s", DebuggerUtils.getTraceFileRoot(jobId),
+      String.format(DebuggerUtils.getTraceFileFormat(debugTrace), taskId,
+        superstepNo));
+  }
+
+  /**
+   * @param jobId id of the job.
+   * @param superstepNo superstep number.
+   * @param debugTrace must be of type MASTER_*.
+   * @return path of the master compute trace file on HDFS.
+   */
+  public static String getMasterTraceFilePath(String jobId, long superstepNo,
+    DebugTrace debugTrace) {
+    assert EnumSet.of(DebugTrace.MASTER_ALL, DebugTrace.MASTER_EXCEPTION,
+      DebugTrace.MASTER_REGULAR).contains(debugTrace);
+    return String.format("%s/%s", DebuggerUtils.getTraceFileRoot(jobId),
+      String.format(DebuggerUtils.getTraceFileFormat(debugTrace), superstepNo));
+  }
+
+  /**
+   * Reads the protocol buffer trace corresponding to the given jobId,
+   * superstepNo and vertexId and returns the giraphScenarioWrapper.
+   *
+   * @param jobId
+   *          : ID of the job debugged.
+   * @param superstepNo
+   *          : Superstep number debugged.
+   * @param vertexId
+   *          - ID of the vertex debugged. Returns GiraphScenarioWrapper.
+   * @param debugTrace - Can be either any one of VERTEX_* and
+   *        INTEGRITY_MESSAGE_SINGLE_VERTEX.
+   * @return the vertex scenario stored in the trace file represented as a
+   *        {@link GiraphVertexScenarioWrapper} object.
+   */
+  public static GiraphVertexScenarioWrapper readScenarioFromTrace(String jobId,
+    long superstepNo, String vertexId, DebugTrace debugTrace)
+    throws IOException, ClassNotFoundException, InstantiationException,
+    IllegalAccessException {
+    FileSystem fs = ServerUtils.getFileSystem();
+    GiraphVertexScenarioWrapper giraphScenarioWrapper =
+      new GiraphVertexScenarioWrapper();
+    EnumSet<DebugTrace> enumSet = EnumSet.of(debugTrace);
+    if (debugTrace == DebugTrace.VERTEX_ALL) {
+      enumSet = EnumSet.of(DebugTrace.VERTEX_REGULAR,
+        DebugTrace.VERTEX_EXCEPTION, DebugTrace.INTEGRITY_VERTEX,
+        DebugTrace.INTEGRITY_MESSAGE_SINGLE_VERTEX);
+    }
+    // Loops through all possible debug traces and returns the first one found.
+    for (DebugTrace enumValue : enumSet) {
+      String traceFilePath = ServerUtils.getVertexTraceFilePath(jobId,
+        superstepNo, vertexId, enumValue);
+      try {
+        // If scenario is found, return it.
+        giraphScenarioWrapper.loadFromHDFS(fs, traceFilePath,
+          getCachedJobJarPath(jobId));
+        return giraphScenarioWrapper;
+      } catch (FileNotFoundException e) {
+        // Ignore the exception since we will try reading another traceType
+        // again.
+        LOG.info("readScenarioFromTrace: File not found. Ignoring.");
+      }
+    }
+    // None of the debugTrace types were found. Throw exception.
+    throw new FileNotFoundException("Debug Trace not found.");
+  }
+
+  /**
+   * Reads the master protocol buffer trace corresponding to the given jobId and
+   * superstepNo and returns the GiraphMasterScenarioWrapper object.
+   *
+   * @param jobId
+   *          : ID of the job debugged.
+   * @param superstepNo
+   *          : Superstep number debugged.
+   * @param debugTrace - Can be either MASTER_REGULAR, MASTER_EXCEPTION OR
+   *        MASTER_ALL. In case of MASTER_ALL, returns whichever trace is
+   *        available.
+   * @return the master scenario stored in the trace file represented as a
+   *        {@link GiraphMasterScenarioWrapper} object.
+   */
+  public static GiraphMasterScenarioWrapper readMasterScenarioFromTrace(
+    String jobId, long superstepNo, DebugTrace debugTrace) throws IOException,
+    ClassNotFoundException, InstantiationException, IllegalAccessException {
+    if (!EnumSet.of(DebugTrace.MASTER_ALL, DebugTrace.MASTER_EXCEPTION,
+      DebugTrace.MASTER_REGULAR).contains(debugTrace)) {
+      // Throw exception for unsupported debug trace.
+      throw new IllegalArgumentException(
+        "DebugTrace type is invalid. Use REGULAR, EXCEPTION or ALL_VERTICES");
+    }
+    FileSystem fs = ServerUtils.getFileSystem();
+    GiraphMasterScenarioWrapper giraphScenarioWrapper =
+      new GiraphMasterScenarioWrapper();
+    // For each superstep, there is either a "regular" master trace (saved in
+    // master_reg_stp_i.tr files), or an "exception" master trace (saved in
+    // master_err_stp_i.tr files). We first check to see if a regular master
+    // trace is available. If not, then we check to see if an exception master
+    // trace is available.
+    if (debugTrace == DebugTrace.MASTER_REGULAR ||
+      debugTrace == DebugTrace.MASTER_ALL) {
+      String traceFilePath = ServerUtils.getMasterTraceFilePath(jobId,
+        superstepNo, DebugTrace.MASTER_REGULAR);
+      try {
+        giraphScenarioWrapper.loadFromHDFS(fs, traceFilePath,
+          getCachedJobJarPath(jobId));
+        // If scenario is found, return it.
+        return giraphScenarioWrapper;
+      } catch (FileNotFoundException e) {
+        // If debugTrace was null, ignore this exception since
+        // we will try reading exception trace later.
+        if (debugTrace == DebugTrace.MASTER_ALL) {
+          LOG.info("readMasterScenarioFromTrace: Regular file not found. " +
+            "Ignoring.");
+        } else {
+          throw e;
+        }
+      }
+    }
+    // This code is reached only when debugTrace = exception or null.
+    // In case of null, it is only reached when regular trace is not found
+    // already.
+    String traceFilePath = ServerUtils.getMasterTraceFilePath(jobId,
+      superstepNo, DebugTrace.MASTER_EXCEPTION);
+    giraphScenarioWrapper.loadFromHDFS(fs, traceFilePath,
+      getCachedJobJarPath(jobId));
+    return giraphScenarioWrapper;
+  }
+
+  /**
+   * @param jobId id of the job.
+   * @param taskId id of the task.
+   * @param superstepNo superstep number.
+   * @return the {@linke MsgIntegrityViolationWrapper} from trace file.
+   */
+  public static MsgIntegrityViolationWrapper readMsgIntegrityViolationFromTrace(
+    String jobId, String taskId, long superstepNo) throws IOException,
+    ClassNotFoundException, InstantiationException, IllegalAccessException {
+    FileSystem fs = ServerUtils.getFileSystem();
+    String traceFilePath = ServerUtils.getIntegrityTraceFilePath(jobId, taskId,
+      superstepNo, DebugTrace.INTEGRITY_MESSAGE_ALL);
+    MsgIntegrityViolationWrapper msgIntegrityViolationWrapper =
+      new MsgIntegrityViolationWrapper();
+    msgIntegrityViolationWrapper.loadFromHDFS(fs, traceFilePath,
+      getCachedJobJarPath(jobId));
+    return msgIntegrityViolationWrapper;
+  }
+
+  /**
+   * @param jobId id of the job.
+   * @param superstepNo superstep number.
+   * @param vertexId id of the vertex.
+   * @return the vertex integrity data from the trace file stored inside
+   *    {@link GiraphVertexScenarioWrapper}.
+   */
+  public static GiraphVertexScenarioWrapper
+  readVertexIntegrityViolationFromTrace(String jobId, long superstepNo,
+    String vertexId) throws IOException,
+    ClassNotFoundException, InstantiationException, IllegalAccessException {
+    FileSystem fs = ServerUtils.getFileSystem();
+    String traceFilePath = ServerUtils.getVertexTraceFilePath(jobId,
+      superstepNo, vertexId, DebugTrace.INTEGRITY_VERTEX);
+    GiraphVertexScenarioWrapper giraphScenarioWrapper =
+      new GiraphVertexScenarioWrapper();
+    giraphScenarioWrapper.loadFromHDFS(fs, traceFilePath);
+    return giraphScenarioWrapper;
+  }
+
+  /**
+   * Converts a Giraph Scenario (giraphScenarioWrapper object) to JSON
+   * (JSONObject)
+   *
+   * @param giraphScenarioWrapper Giraph Scenario object.
+   * @return scenario data stored as json.
+   */
+  public static JSONObject scenarioToJSON(
+    GiraphVertexScenarioWrapper giraphScenarioWrapper) throws JSONException {
+    VertexContextWrapper contextWrapper = giraphScenarioWrapper
+      .getContextWrapper();
+    JSONObject scenarioObj = new JSONObject();
+    scenarioObj.put("vertexId", contextWrapper.getVertexIdWrapper());
+    scenarioObj.put("vertexValue", contextWrapper.getVertexValueAfterWrapper());
+    JSONObject outgoingMessagesObj = new JSONObject();
+    JSONArray neighborsList = new JSONArray();
+    // Add outgoing messages.
+    for (Object outgoingMessage : contextWrapper.getOutgoingMessageWrappers()) {
+      OutgoingMessageWrapper outgoingMessageWrapper =
+        (OutgoingMessageWrapper) outgoingMessage;
+      outgoingMessagesObj.put(outgoingMessageWrapper.getDestinationId().
+        toString(), outgoingMessageWrapper.getMessage().toString());
+    }
+    // Add incoming messages.
+    ArrayList<String> incomingMessagesList = new ArrayList<String>();
+    for (Object incomingMessage : contextWrapper.getIncomingMessageWrappers()) {
+      incomingMessagesList.add(incomingMessage.toString());
+    }
+    // Add neighbors.
+    for (Object neighbor : contextWrapper.getNeighborWrappers()) {
+      JSONObject neighborObject = new JSONObject();
+      NeighborWrapper neighborWrapper = (NeighborWrapper) neighbor;
+      neighborObject.put("neighborId", neighborWrapper.getNbrId());
+      neighborObject.put("edgeValue", neighborWrapper.getEdgeValue());
+      neighborsList.put(neighborObject);
+    }
+    scenarioObj.put("outgoingMessages", outgoingMessagesObj);
+    scenarioObj.put("incomingMessages", incomingMessagesList);
+    scenarioObj.put("neighbors", neighborsList);
+    // Add exception, if present.
+    if (giraphScenarioWrapper.hasExceptionWrapper()) {
+      JSONObject exceptionObj = new JSONObject();
+      ExceptionWrapper exceptionWrapper = giraphScenarioWrapper
+        .getExceptionWrapper();
+      exceptionObj.put("message", exceptionWrapper.getErrorMessage());
+      exceptionObj.put("stackTrace", exceptionWrapper.getStackTrace());
+      scenarioObj.put("exception", exceptionObj);
+    }
+    JSONObject aggregateObj = new JSONObject();
+    for (Object aggregatedValue : contextWrapper
+      .getCommonVertexMasterContextWrapper().getPreviousAggregatedValues()) {
+      AggregatedValueWrapper aggregatedValueWrapper =
+        (AggregatedValueWrapper) aggregatedValue;
+      aggregateObj.put(aggregatedValueWrapper.getKey(),
+        aggregatedValueWrapper.getValue());
+    }
+    scenarioObj.put("aggregators", aggregateObj);
+    return scenarioObj;
+  }
+
+  /**
+   * Converts the message integrity violation wrapper to JSON.
+   *
+   * @param msgIntegrityViolationWrapper {@link MsgIntegrityViolationWrapper}
+   *        object.
+   * @return message integrity violation data stored as json.
+   */
+  public static JSONObject msgIntegrityToJson(
+    MsgIntegrityViolationWrapper msgIntegrityViolationWrapper)
+    throws JSONException {
+    JSONObject scenarioObj = new JSONObject();
+    ArrayList<JSONObject> violationsList = new ArrayList<JSONObject>();
+    scenarioObj.put("superstepId",
+      msgIntegrityViolationWrapper.getSuperstepNo());
+    for (Object msgWrapper : msgIntegrityViolationWrapper
+      .getExtendedOutgoingMessageWrappers()) {
+      ExtendedOutgoingMessageWrapper extendedOutgoingMessageWrapper =
+         (ExtendedOutgoingMessageWrapper) msgWrapper;
+      JSONObject violationObj = new JSONObject();
+      violationObj.put("srcId", extendedOutgoingMessageWrapper.getSrcId());
+      violationObj.put("destinationId",
+        extendedOutgoingMessageWrapper.getDestinationId());
+      violationObj.put("message", extendedOutgoingMessageWrapper.getMessage());
+      violationsList.add(violationObj);
+    }
+    scenarioObj.put("violations", violationsList);
+    return scenarioObj;
+  }
+
+  /**
+   * Converts the vertex integrity violation wrapper to JSON.
+   *
+   * @param giraphVertexScenarioWrapper {@link GiraphVertexScenarioWrapper}
+   *        object storing the vertex value violation data.
+   * @return vertex integrity violation data stored as json.
+   */
+  public static JSONObject vertexIntegrityToJson(
+    GiraphVertexScenarioWrapper giraphVertexScenarioWrapper)
+    throws JSONException {
+    JSONObject scenarioObj = new JSONObject();
+    VertexContextWrapper vertexContextWrapper = giraphVertexScenarioWrapper
+      .getContextWrapper();
+    scenarioObj.put("vertexId", vertexContextWrapper.getVertexIdWrapper());
+    scenarioObj.put("vertexValue",
+      vertexContextWrapper.getVertexValueAfterWrapper());
+    return scenarioObj;
+  }
+
+  /**
+   * @param jobId id of the job.
+   * @param superstepNo superstep number.
+   * @param debugTrace type of vertex trace files.
+   * @return a list of vertex Ids that were debugged in the given superstep by
+   * reading (the file names of) the debug traces on HDFS. File names follow the
+   * <prefix>_stp_<superstepNo>_vid_<vertexId>.tr naming convention.
+   */
+  public static List<String> getVerticesDebugged(String jobId,
+    long superstepNo, DebugTrace debugTrace) throws IOException {
+    ArrayList<String> vertexIds = new ArrayList<String>();
+    FileSystem fs = ServerUtils.getFileSystem();
+    String traceFileRoot = DebuggerUtils.getTraceFileRoot(jobId);
+    // Use this regex to match the file name and capture the vertex id.
+    String regex = String.format(DebuggerUtils.getTraceFileFormat(debugTrace),
+      superstepNo, "(.*?)");
+    Pattern p = Pattern.compile(regex);
+    Path pt = new Path(traceFileRoot);
+    FileStatus[] fileStatuses = null;
+    // Hadoop listStatus returns null when path is not found.
+    fileStatuses = fs.listStatus(pt);
+    if (fileStatuses == null) {
+      throw new FileNotFoundException("Debug trace file not found.");
+    }
+    // Iterate through each file in this diFilerectory and match the regex.
+    for (FileStatus fileStatus : fileStatuses) {
+      String fileName = fileStatus.getPath().getName();
+      Matcher m = p.matcher(fileName);
+      // Add this vertex id if there is a match.
+      if (m.find()) {
+        // VERTEX_ALL debug trace has one group to match the prefix -reg|err.
+        // FIXME XXX this is terrible: we pretend to know nothing about the
+        // patterns defined in DebuggerUtils#getTraceFileFormat(), but all of a
+        // sudden we're using inside knowledge to extract the vertex id part. :S
+        vertexIds.add(m.group(debugTrace == DebugTrace.VERTEX_ALL ? 2 : 1));
+      }
+    }
+    return vertexIds;
+  }
+
+  /**
+   * @param jobId id of the job.
+   * @param superstepNo superstep number.
+   * @param debugTrace must be one of INTEGRITY_* types.
+   * @return the IDs of all the tasks that caused the given integrity violation.
+   */
+  public static List<String> getTasksWithIntegrityViolations(String jobId,
+    long superstepNo, DebugTrace debugTrace) throws IOException {
+    assert EnumSet.of(DebugTrace.INTEGRITY_MESSAGE_ALL,
+      DebugTrace.INTEGRITY_VERTEX).contains(debugTrace);
+    ArrayList<String> taskIds = new ArrayList<String>();
+    FileSystem fs = ServerUtils.getFileSystem();
+    String traceFileRoot = DebuggerUtils.getTraceFileRoot(jobId);
+    // Use this regex to match the file name and capture the vertex id.
+    String regex = String.format(DebuggerUtils.getTraceFileFormat(debugTrace),
+      "(.*?)", superstepNo);
+    Pattern p = Pattern.compile(regex);
+    Path pt = new Path(traceFileRoot);
+    FileStatus[] fileStatuses = null;
+    // Hadoop listStatus returns null when path is not found.
+    fileStatuses = fs.listStatus(pt);
+    if (fileStatuses == null) {
+      throw new FileNotFoundException("Debug trace file not found.");
+    }
+    // Iterate through each file in this directory and match the regex.
+    for (FileStatus fileStatus : fileStatuses) {
+      String fileName = fileStatus.getPath().getName();
+      Matcher m = p.matcher(fileName);
+      // Add this vertex id if there is a match.
+      if (m.find()) {
+        taskIds.add(m.group(1));
+      }
+    }
+    return taskIds;
+  }
+
+  /**
+   * @param jobId id of the job.
+   * @return the list of supersteps for which there is an exception or regular
+   * trace.
+   */
+  public static List<Long> getSuperstepsDebugged(String jobId)
+    throws IOException {
+    Set<Long> superstepIds = Sets.newHashSet();
+    FileSystem fs = ServerUtils.getFileSystem();
+    String traceFileRoot = DebuggerUtils.getTraceFileRoot(jobId);
+    // Use this regex to match the file name and capture the vertex id.
+    String regex = "(reg|err|msg_intgrty|vv_intgrty)_stp_(.*?)_vid_(.*?).tr$";
+    Pattern p = Pattern.compile(regex);
+    Path pt = new Path(traceFileRoot);
+    // Iterate through each file in this directory and match the regex.
+    for (FileStatus fileStatus : fs.listStatus(pt)) {
+      String fileName = fileStatus.getPath().getName();
+      Matcher m = p.matcher(fileName);
+      // Add this vertex id if there is a match.
+      if (m.find()) {
+        superstepIds.add(Long.parseLong(m.group(2)));
+      }
+    }
+    return Lists.newArrayList(superstepIds);
+  }
+
+  /**
+   * @param jobId id of the job.
+   * @return the list of supersteps for which there is an exception or regular
+   * trace.
+   */
+  public static List<Long> getSuperstepsMasterDebugged(String jobId)
+    throws IOException {
+    Set<Long> superstepIds = Sets.newHashSet();
+    FileSystem fs = ServerUtils.getFileSystem();
+    String traceFileRoot = DebuggerUtils.getTraceFileRoot(jobId);
+    // Use this regex to match the file name and capture the vertex id.
+    String regex = "master_.*_stp_(\\d+?).tr$";
+    Pattern p = Pattern.compile(regex);
+    Path pt = new Path(traceFileRoot);
+    // Iterate through each file in this directory and match the regex.
+    for (FileStatus fileStatus : fs.listStatus(pt)) {
+      String fileName = fileStatus.getPath().getName();
+      Matcher m = p.matcher(fileName);
+      // Add this vertex id if there is a match.
+      if (m.find()) {
+        superstepIds.add(Long.parseLong(m.group(1)));
+      }
+    }
+    return Lists.newArrayList(superstepIds);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/package-info.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/package-info.java
new file mode 100644
index 0000000..c3aaec0
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/gui/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes to run Giraph debugger GUI.
+ */
+package org.apache.giraph.debugger.gui;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingComputation.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingComputation.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingComputation.java
new file mode 100644
index 0000000..b6656fa
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingComputation.java
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.instrumenter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Type;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.debugger.DebugConfig;
+import org.apache.giraph.debugger.utils.DebuggerUtils;
+import org.apache.giraph.debugger.utils.DebuggerUtils.DebugTrace;
+import org.apache.giraph.debugger.utils.ExceptionWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper;
+import org.apache.giraph.debugger.utils.GiraphVertexScenarioWrapper.VertexContextWrapper;
+import org.apache.giraph.debugger.utils.MsgIntegrityViolationWrapper;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.AbstractComputation;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Class that intercepts call to the AbstractComputation's exposed methods for
+ * GiraphDebugger.
+ *
+ * @param <I>
+ *          Vertex id
+ * @param <V>
+ *          Vertex data
+ * @param <E>
+ *          Edge data
+ * @param <M1>
+ *          Incoming message type
+ * @param <M2>
+ *          Outgoing message type
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public abstract class AbstractInterceptingComputation<
+  I extends WritableComparable, V extends Writable, E extends Writable,
+  M1 extends Writable, M2 extends Writable>
+  extends AbstractComputation<I, V, E, M1, M2> {
+
+
+  /**
+   * Configuration key for the class name of the class that extends DebugConfig.
+   */
+  public static final String CONFIG_CLASS_KEY = "giraph.debugger.configClass";
+
+  /**
+   * Giraph configuration for specifying the DebugConfig class.
+   */
+  public static final StrConfOption DEBUG_CONFIG_CLASS = new StrConfOption(
+    CONFIG_CLASS_KEY, DebugConfig.class.getName(),
+    "The name of the Debug Config class for the computation (e.g. " +
+      "org.apache.giraph.debugger.examples.SimpleShortestPathsDebugConfig).");
+
+  /**
+   * Logger for this class.
+   */
+  protected static final Logger LOG = Logger
+    .getLogger(AbstractInterceptingComputation.class);
+
+  /**
+   * A flag to indicate whether this Computation class was already initialized.
+   */
+  protected static boolean IS_INITIALIZED;
+  /**
+   * Whether DEBUG_CONFIG tells to check message constraints.
+   */
+  protected static boolean SHOULD_CHECK_MESSAGE_INTEGRITY;
+  /**
+   * Whether DEBUG_CONFIG tells to check vertex value constraints.
+   */
+  protected static boolean SHOULD_CHECK_VERTEX_VALUE_INTEGRITY;
+  /**
+   * Whether DEBUG_CONFIG tells to catch exceptions.
+   */
+  protected static boolean SHOULD_CATCH_EXCEPTIONS;
+
+  /**
+   * Configuration key for the path to the jar signature.
+   */
+  private static final String JAR_SIGNATURE_KEY =
+    "giraph.debugger.jarSignature";
+
+  /**
+   * A constant to limit the number of violations to log.
+   */
+  private static int NUM_VIOLATIONS_TO_LOG = 5;
+  /**
+   * A constant to limit the number of vertices to log.
+   */
+  private static int NUM_VERTICES_TO_LOG = 5;
+  /**
+   * A counter for number of vertices already logged.
+   */
+  private static int NUM_VERTICES_LOGGED = 0;
+  /**
+   * A counter for number of vertex violations already logged.
+   */
+  private static int NUM_VERTEX_VIOLATIONS_LOGGED = -1;
+  /**
+   * A counter for number of message violations already logged.
+   */
+  private static int NUM_MESSAGE_VIOLATIONS_LOGGED = -1;
+
+  /**
+   * DebugConfig instance to be used for debugging.
+   */
+  private static DebugConfig DEBUG_CONFIG;
+
+  /**
+   * The vertex id type as in the I of Giraph's Computation&lt;I,V,E,M1,M2>.
+   */
+  private static Type VERTEX_ID_CLASS;
+  /**
+   * The vertex value type as in the V of Giraph's Computation&lt;I,V,E,M1,M2>.
+   */
+  private static Type VERTEX_VALUE_CLASS;
+  /**
+   * The edge value type as in the E of Giraph's Computation&lt;I,V,E,M1,M2>.
+   */
+  private static Type EDGE_VALUE_CLASS;
+  /**
+   * The incoming message type as in the M1 of Giraph's
+   * Computation&lt;I,V,E,M1,M2>.
+   */
+  private static Type INCOMING_MESSAGE_CLASS;
+  /**
+   * The outgoing message type as in the M2 of Giraph's
+   * Computation&lt;I,V,E,M1,M2>.
+   */
+  private static Type OUTGOING_MESSAGE_CLASS;
+
+  /**
+   * Contains previous aggregators that are available in the beginning of the
+   * superstep.In Giraph, these aggregators are immutable. NOTE: We currently
+   * only capture aggregators that are read by at least one vertex. If we want
+   * to capture all aggregators we need to change Giraph code to be get access
+   * to them.
+   */
+  private static CommonVertexMasterInterceptionUtil
+  COMMON_VERTEX_MASTER_INTERCEPTING_UTIL;
+
+  /**
+   * Whether or not this vertex was configured to be debugged. If so we will
+   * intercept its outgoing messages.
+   */
+  private boolean shouldDebugVertex;
+  /**
+   * Whether to stop intercepting compute() for the remaining vertices.
+   */
+  private boolean shouldStopInterceptingVertex;
+
+  /**
+   * For vertices that are configured to be debugged, we construct a
+   * GiraphVertexScenarioWrapper in the beginning and use it to intercept
+   * outgoing messages
+   */
+  private GiraphVertexScenarioWrapper<I, V, E, M1, M2>
+  giraphVertexScenarioWrapperForRegularTraces;
+
+  /**
+   * If a vertex has violated a message value constraint when it was sending a
+   * message we set this to true so that at the inside interceptComputeEnd()
+   * method we make sure we save a vertexScenario trace for it.
+   */
+  private boolean hasViolatedMsgValueConstraint;
+  /**
+   * Stores the value of a vertex before the compute method is called. If a
+   * vertex throws an exception, or violates a vertex or message value
+   * constraint, then we use this value as the previous vertex value when we
+   * save a vertexScenario trace for it.
+   */
+  private V previousVertexValue;
+  /**
+   * DataOutputBuffer for holding the previous vertex value.
+   */
+  private DataOutputBuffer previousVertexValueOutputBuffer =
+    new DataOutputBuffer();
+  /**
+   * DataInputBuffer for cloning what was preserved for previous vertex value.
+   */
+  private DataInputBuffer previousVertexValueInputBuffer =
+    new DataInputBuffer();
+  /**
+   * We keep the vertex under compute in case some functions need it, e.g.,
+   * sendMessage().
+   */
+  private Vertex<I, V, E> currentVertexUnderCompute;
+  /**
+   * The wrapped instance of message integrity violation.
+   */
+  private MsgIntegrityViolationWrapper<I, M2> msgIntegrityViolationWrapper;
+
+  /**
+   * Provides a way to access the actual Computation class.
+   * @return The actual Computation class
+   */
+  public abstract Class<? extends Computation<I, V, E, ? extends Writable,
+    ? extends Writable>> getActualTestedClass();
+
+  /**
+   * Initializes this class to start debugging.
+   */
+  protected final synchronized void
+  initializeAbstractInterceptingComputation() {
+    if (IS_INITIALIZED) {
+      return; // don't initialize twice
+    }
+    IS_INITIALIZED = true;
+    COMMON_VERTEX_MASTER_INTERCEPTING_UTIL =
+      new CommonVertexMasterInterceptionUtil(
+        getContext().getJobID().toString());
+    String debugConfigClassName = DEBUG_CONFIG_CLASS.get(getConf());
+    LOG.info("initializing debugConfigClass: " + debugConfigClassName);
+    Class<?> clazz;
+    try {
+      clazz = Class.forName(debugConfigClassName);
+      DEBUG_CONFIG = (DebugConfig<I, V, E, M1, M2>) clazz.newInstance();
+      DEBUG_CONFIG.readConfig(getConf(), getTotalNumVertices(),
+        getContext().getJobID().getId());
+      VERTEX_ID_CLASS = getConf().getVertexIdClass();
+      VERTEX_VALUE_CLASS = getConf().getVertexValueClass();
+      EDGE_VALUE_CLASS = getConf().getEdgeValueClass();
+      INCOMING_MESSAGE_CLASS = getConf().getIncomingMessageValueClass();
+      OUTGOING_MESSAGE_CLASS = getConf().getOutgoingMessageValueClass();
+      // Set limits from DebugConfig
+      NUM_VERTICES_TO_LOG = DEBUG_CONFIG.getNumberOfVerticesToLog();
+      NUM_VIOLATIONS_TO_LOG = DEBUG_CONFIG.getNumberOfViolationsToLog();
+      // Reset counters
+      NUM_MESSAGE_VIOLATIONS_LOGGED = 0;
+      NUM_VERTEX_VIOLATIONS_LOGGED = 0;
+      NUM_VERTICES_LOGGED = 0;
+      // Cache DebugConfig flags
+      SHOULD_CATCH_EXCEPTIONS = DEBUG_CONFIG.shouldCatchExceptions();
+      SHOULD_CHECK_VERTEX_VALUE_INTEGRITY =
+        DEBUG_CONFIG.shouldCheckVertexValueIntegrity();
+      SHOULD_CHECK_MESSAGE_INTEGRITY =
+        DEBUG_CONFIG.shouldCheckMessageIntegrity();
+    } catch (InstantiationException | ClassNotFoundException |
+      IllegalAccessException e) {
+      LOG.error("Could not create a new DebugConfig instance of " +
+        debugConfigClassName);
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    if (getWorkerContext().getMyWorkerIndex() == getWorkerContext()
+      .getWorkerCount() - 1) {
+      // last worker records jar signature if necessary
+      String jarSignature = getConf().get(JAR_SIGNATURE_KEY);
+      if (jarSignature != null) {
+        Path jarSignaturePath = new Path(
+          DebuggerUtils.getTraceFileRoot(COMMON_VERTEX_MASTER_INTERCEPTING_UTIL
+            .getJobId()) + "/" + "jar.signature");
+        LOG.info("Recording jar signature (" + jarSignature + ") at " +
+          jarSignaturePath);
+        FileSystem fs = COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getFileSystem();
+        try {
+          if (!fs.exists(jarSignaturePath)) {
+            OutputStream f = fs.create(jarSignaturePath,
+              true).getWrappedStream();
+            IOUtils.write(jarSignature, f);
+            f.close();
+          }
+        } catch (IOException e) {
+          // When multiple workers try to write the jar.signature, some of them
+          // may cause
+          // AlreadyBeingCreatedException to be thrown, which we ignore.
+          e.printStackTrace();
+        }
+      }
+    }
+    LOG.info("done initializing debugConfigClass: " + debugConfigClassName);
+  }
+
+  /**
+   * Keep the vertex value as the previous one.
+   *
+   * @param vertex the vertex
+   * @throws IOException
+   */
+  private void keepPreviousVertexValue(Vertex<I, V, E> vertex) throws
+  IOException {
+    previousVertexValueOutputBuffer.reset();
+    vertex.getValue().write(previousVertexValueOutputBuffer);
+  }
+
+  /**
+   * Clone the kept previous vertex value.
+   *
+   * @return Copy of previous vertex value.
+   *   Same instance will be reused across multiple calls.
+   * @throws IOException
+   */
+  private V getPreviousVertexValue() throws IOException {
+    previousVertexValueInputBuffer.reset(
+      previousVertexValueOutputBuffer.getData(),
+      previousVertexValueOutputBuffer.getLength());
+    if (previousVertexValue == null) {
+      previousVertexValue = getConf().createVertexValue();
+    }
+    previousVertexValue.readFields(previousVertexValueInputBuffer);
+    return previousVertexValue;
+  }
+
+  /**
+   * @return whether captured enough number of info for debugging.
+   */
+  private boolean hasInterceptedEnough() {
+    return NUM_VERTICES_LOGGED >= NUM_VERTICES_TO_LOG ||
+      NUM_VERTEX_VIOLATIONS_LOGGED >= NUM_VIOLATIONS_TO_LOG ||
+      NUM_MESSAGE_VIOLATIONS_LOGGED >= NUM_VIOLATIONS_TO_LOG;
+  }
+
+  /**
+   * Called before {@link Computation#preSuperstep()} to prepare a message
+   * integrity violation wrapper.
+   * @return true if compute() does not need to be intercepted for this
+   *  superstep.
+   */
+  protected final boolean interceptPreSuperstepBegin() {
+    // LOG.info("before preSuperstep");
+    NUM_VERTICES_LOGGED = 0;
+    NUM_VERTEX_VIOLATIONS_LOGGED = 0;
+    NUM_MESSAGE_VIOLATIONS_LOGGED = 0;
+    if (!DEBUG_CONFIG.shouldDebugSuperstep(getSuperstep()) ||
+      hasInterceptedEnough()) {
+      shouldStopInterceptingVertex = true;
+      return true;
+    }
+    if (SHOULD_CHECK_VERTEX_VALUE_INTEGRITY) {
+      LOG.info("creating a vertexValueViolationWrapper. superstepNo: " +
+        getSuperstep());
+    }
+
+    if (SHOULD_CHECK_MESSAGE_INTEGRITY) {
+      LOG.info("creating a msgIntegrityViolationWrapper. superstepNo: " +
+        getSuperstep());
+      msgIntegrityViolationWrapper = new MsgIntegrityViolationWrapper<>(
+        (Class<I>) VERTEX_ID_CLASS, (Class<M2>) OUTGOING_MESSAGE_CLASS);
+      msgIntegrityViolationWrapper.setSuperstepNo(getSuperstep());
+    }
+
+    // LOG.info("before preSuperstep done");
+    shouldStopInterceptingVertex = false;
+    return false;
+  }
+
+  /**
+   * Called immediately when the compute() method is entered. Initializes data
+   * that will be required for debugging throughout the rest of the compute
+   * function.
+   *
+   * @param vertex The vertex that's about to be computed.
+   * @param messages The incoming messages for the vertex.
+   * @throws IOException
+   */
+  protected final void interceptComputeBegin(Vertex<I, V, E> vertex,
+    Iterable<M1> messages) throws IOException {
+    if (!IS_INITIALIZED) {
+      // TODO: Sometimes Giraph doesn't call initialize() and directly calls
+      // compute(). Here we
+      // guard against things not being initiliazed, which was causing null
+      // pointer exceptions.
+      // Find out when/why this happens.
+      LOG.warn("interceptComputeBegin is called but debugConfig is null." +
+        " Initializing AbstractInterceptingComputation again...");
+      initializeAbstractInterceptingComputation();
+    }
+    // A vertex should be debugged if:
+    // 1) the user configures the superstep to be debugged;
+    // 2) the user configures the vertex to be debugged; and
+    // 3) we have already debugged less than a threshold of vertices in this
+    // superstep.
+    shouldDebugVertex = NUM_VERTICES_LOGGED < NUM_VERTICES_TO_LOG &&
+      DEBUG_CONFIG.shouldDebugVertex(vertex, getSuperstep());
+    if (shouldDebugVertex) {
+      giraphVertexScenarioWrapperForRegularTraces = getGiraphVertexScenario(
+        vertex, vertex.getValue(), messages);
+    }
+    // Keep a reference to the current vertex only when necessary.
+    if (SHOULD_CHECK_MESSAGE_INTEGRITY &&
+      NUM_MESSAGE_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG) {
+      currentVertexUnderCompute = vertex;
+      hasViolatedMsgValueConstraint = false;
+    }
+    // Keep the previous value only when necessary.
+    if (SHOULD_CATCH_EXCEPTIONS ||
+      SHOULD_CHECK_VERTEX_VALUE_INTEGRITY &&
+      NUM_VERTEX_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG ||
+      SHOULD_CHECK_MESSAGE_INTEGRITY &&
+      NUM_MESSAGE_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG) {
+      keepPreviousVertexValue(vertex);
+    }
+  }
+
+  /**
+   * Captures exception from {@link Computation#compute(Vertex, Iterable)}.
+   *
+   * @param vertex The vertex that was being computed.
+   * @param messages The incoming messages for the vertex.
+   * @param e The exception thrown.
+   * @throws IOException
+   */
+  protected final void interceptComputeException(Vertex<I, V, E> vertex,
+    Iterable<M1> messages, Throwable e) throws IOException {
+    LOG.info("Caught an exception. message: " + e.getMessage() +
+      ". Saving a trace in HDFS.");
+    GiraphVertexScenarioWrapper<I, V, E, M1, M2>
+    giraphVertexScenarioWrapperForExceptionTrace = getGiraphVertexScenario(
+      vertex, getPreviousVertexValue(), messages);
+    ExceptionWrapper exceptionWrapper = new ExceptionWrapper(e.getMessage(),
+      ExceptionUtils.getStackTrace(e));
+    giraphVertexScenarioWrapperForExceptionTrace
+      .setExceptionWrapper(exceptionWrapper);
+    COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.saveScenarioWrapper(
+      giraphVertexScenarioWrapperForExceptionTrace, DebuggerUtils
+        .getFullTraceFileName(DebugTrace.VERTEX_EXCEPTION,
+          COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getJobId(), getSuperstep(),
+          vertex.getId().toString()));
+  }
+
+  /**
+   * Called after {@link Computation#compute(Vertex, Iterable)} to check vertex
+   * and message value integrity.
+   *
+   * @param vertex The vertex that was computed.
+   * @param messages The incoming messages for the vertex.
+   * @return whether compute() needs to be intercepted more.
+   * @throws IOException
+   */
+  protected final boolean interceptComputeEnd(Vertex<I, V, E> vertex,
+    Iterable<M1> messages) throws IOException {
+    if (shouldDebugVertex) {
+      // Reflect changes made by compute to scenario.
+      giraphVertexScenarioWrapperForRegularTraces.getContextWrapper()
+        .setVertexValueAfterWrapper(vertex.getValue());
+      // Save vertex scenario.
+      COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.saveScenarioWrapper(
+        giraphVertexScenarioWrapperForRegularTraces, DebuggerUtils
+          .getFullTraceFileName(DebugTrace.VERTEX_REGULAR,
+            COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getJobId(), getSuperstep(),
+            vertex.getId().toString()));
+      NUM_VERTICES_LOGGED++;
+    }
+    if (SHOULD_CHECK_VERTEX_VALUE_INTEGRITY &&
+      NUM_VERTEX_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG &&
+      !DEBUG_CONFIG.isVertexValueCorrect(vertex.getId(), vertex.getValue())) {
+      initAndSaveGiraphVertexScenarioWrapper(vertex, messages,
+        DebugTrace.INTEGRITY_VERTEX);
+      NUM_VERTEX_VIOLATIONS_LOGGED++;
+    }
+    if (hasViolatedMsgValueConstraint) {
+      initAndSaveGiraphVertexScenarioWrapper(vertex, messages,
+        DebugTrace.INTEGRITY_MESSAGE_SINGLE_VERTEX);
+      NUM_MESSAGE_VIOLATIONS_LOGGED++;
+    }
+
+    shouldStopInterceptingVertex = hasInterceptedEnough();
+    return shouldStopInterceptingVertex;
+  }
+
+  /**
+   * Called after {@link Computation#postSuperstep()} to save the captured
+   * scenario.
+   */
+  protected final void interceptPostSuperstepEnd() {
+    // LOG.info("after postSuperstep");
+    if (SHOULD_CHECK_MESSAGE_INTEGRITY &&
+      msgIntegrityViolationWrapper.numMsgWrappers() > 0) {
+      COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.saveScenarioWrapper(
+        msgIntegrityViolationWrapper, DebuggerUtils
+          .getMessageIntegrityAllTraceFullFileName(getSuperstep(),
+            COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getJobId(), UUID.randomUUID()
+              .toString()));
+    }
+    // LOG.info("after postSuperstep done");
+  }
+
+  /**
+   * Saves the captured scenario for the given vertex.
+   *
+   * @param vertex The vertex that was computed.
+   * @param messages The incoming messages for the vertex.
+   * @param debugTrace The debug trace to save.
+   * @throws IOException
+   */
+  private void initAndSaveGiraphVertexScenarioWrapper(Vertex<I, V, E> vertex,
+    Iterable<M1> messages, DebugTrace debugTrace) throws IOException {
+    GiraphVertexScenarioWrapper<I, V, E, M1, M2>
+    giraphVertexScenarioWrapper = getGiraphVertexScenario(
+      vertex, getPreviousVertexValue(), messages);
+    COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.saveScenarioWrapper(
+      giraphVertexScenarioWrapper, DebuggerUtils.getFullTraceFileName(
+        debugTrace, COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.getJobId(),
+        getSuperstep(), vertex.getId().toString()));
+  }
+
+  /**
+   * We pass the previous vertex value to assign as an argument because for some
+   * traces we capture the context lazily and store the previous value
+   * temporarily in an object. In those cases the previous value is not equal to
+   * the current value of the vertex. And sometimes it is equal to the current
+   * value.
+   *
+   * @param vertex The vertex the scenario will capture.
+   * @param previousVertexValueToAssign The previous vertex value.
+   * @param messages The incoming messages for this superstep.
+   * @return A scenario for the given vertex.
+   * @throws IOException
+   */
+  private GiraphVertexScenarioWrapper<I, V, E, M1, M2> getGiraphVertexScenario(
+    Vertex<I, V, E> vertex, V previousVertexValueToAssign,
+    Iterable<M1> messages) throws IOException {
+    GiraphVertexScenarioWrapper<I, V, E, M1, M2> giraphVertexScenarioWrapper =
+      new GiraphVertexScenarioWrapper(
+        getActualTestedClass(), (Class<I>) VERTEX_ID_CLASS,
+        (Class<V>) VERTEX_VALUE_CLASS, (Class<E>) EDGE_VALUE_CLASS,
+        (Class<M1>) INCOMING_MESSAGE_CLASS, (Class<M2>) OUTGOING_MESSAGE_CLASS);
+    VertexContextWrapper contextWrapper =
+      giraphVertexScenarioWrapper.getContextWrapper();
+    contextWrapper
+      .setVertexValueBeforeWrapper(previousVertexValueToAssign);
+    COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.initCommonVertexMasterContextWrapper(
+      getConf(), getSuperstep(), getTotalNumVertices(), getTotalNumEdges());
+    contextWrapper
+      .setCommonVertexMasterContextWrapper(
+        COMMON_VERTEX_MASTER_INTERCEPTING_UTIL
+        .getCommonVertexMasterContextWrapper());
+    giraphVertexScenarioWrapper.getContextWrapper().setVertexIdWrapper(
+      vertex.getId());
+    Iterable<Edge<I, E>> returnVal = vertex.getEdges();
+    for (Edge<I, E> edge : returnVal) {
+      giraphVertexScenarioWrapper.getContextWrapper().addNeighborWrapper(
+        edge.getTargetVertexId(), edge.getValue());
+    }
+    for (M1 message : messages) {
+      giraphVertexScenarioWrapper.getContextWrapper()
+        .addIncomingMessageWrapper(message);
+    }
+    giraphVertexScenarioWrapper.getContextWrapper().setVertexValueAfterWrapper(
+      vertex.getValue());
+    return giraphVertexScenarioWrapper;
+  }
+
+  /**
+   * First intercepts the sent message if necessary and calls and then calls
+   * AbstractComputation's sendMessage method.
+   *
+   * @param id
+   *          Vertex id to send the message to
+   * @param message
+   *          Message data to send
+   */
+  @Override
+  public void sendMessage(I id, M2 message) {
+    if (!shouldStopInterceptingVertex) {
+      if (shouldDebugVertex) {
+        giraphVertexScenarioWrapperForRegularTraces.getContextWrapper()
+          .addOutgoingMessageWrapper(id, message);
+      }
+      if (SHOULD_CHECK_MESSAGE_INTEGRITY &&
+        NUM_MESSAGE_VIOLATIONS_LOGGED < NUM_VIOLATIONS_TO_LOG) {
+        I senderId = currentVertexUnderCompute.getId();
+        if (!DEBUG_CONFIG.isMessageCorrect(senderId, id, message,
+          getSuperstep())) {
+          msgIntegrityViolationWrapper.addMsgWrapper(
+            currentVertexUnderCompute.getId(), id, message);
+          NUM_MESSAGE_VIOLATIONS_LOGGED++;
+          hasViolatedMsgValueConstraint = true;
+        }
+      }
+    }
+    super.sendMessage(id, message);
+  }
+
+  /**
+   * First intercepts the sent messages to all edges if necessary and calls and
+   * then calls AbstractComputation's sendMessageToAllEdges method.
+   *
+   * @param vertex
+   *          Vertex whose edges to send the message to.
+   * @param message
+   *          Message sent to all edges.
+   */
+  @Override
+  public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
+    if (!shouldStopInterceptingVertex) {
+      if (shouldDebugVertex) {
+        for (Edge<I, E> edge : vertex.getEdges()) {
+          giraphVertexScenarioWrapperForRegularTraces.getContextWrapper()
+            .addOutgoingMessageWrapper(edge.getTargetVertexId(), message);
+        }
+      }
+      if (SHOULD_CHECK_MESSAGE_INTEGRITY) {
+        I senderId = vertex.getId();
+        for (Edge<I, E> edge : vertex.getEdges()) {
+          if (NUM_MESSAGE_VIOLATIONS_LOGGED >= NUM_VIOLATIONS_TO_LOG) {
+            break;
+          }
+          I id = edge.getTargetVertexId();
+          if (DEBUG_CONFIG.isMessageCorrect(senderId, id, message,
+            getSuperstep())) {
+            continue;
+          }
+          msgIntegrityViolationWrapper.addMsgWrapper(senderId, id, message);
+          hasViolatedMsgValueConstraint = true;
+          NUM_MESSAGE_VIOLATIONS_LOGGED++;
+        }
+      }
+    }
+    super.sendMessageToAllEdges(vertex, message);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    A retVal = super.<A>getAggregatedValue(name);
+    if (!shouldStopInterceptingVertex) {
+      COMMON_VERTEX_MASTER_INTERCEPTING_UTIL.addAggregatedValueIfNotExists(name,
+        retVal);
+    }
+    return retVal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingMasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingMasterCompute.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingMasterCompute.java
new file mode 100644
index 0000000..d6ed908
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/instrumenter/AbstractInterceptingMasterCompute.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.instrumenter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.giraph.debugger.utils.DebuggerUtils;
+import org.apache.giraph.debugger.utils.DebuggerUtils.DebugTrace;
+import org.apache.giraph.debugger.utils.ExceptionWrapper;
+import org.apache.giraph.debugger.utils.GiraphMasterScenarioWrapper;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+/**
+ * Class that intercepts calls to {@link MasterCompute}'s exposed methods for
+ * GiraphDebugger.
+ */
+public abstract class AbstractInterceptingMasterCompute extends MasterCompute {
+
+  /**
+   * Logger for this class.
+   */
+  protected static final Logger LOG = Logger
+    .getLogger(AbstractInterceptingMasterCompute.class);
+  /**
+   * The master scenario being captured.
+   */
+  private GiraphMasterScenarioWrapper giraphMasterScenarioWrapper;
+  /**
+   * The utility for intercepting master computes.
+   */
+  private CommonVertexMasterInterceptionUtil commonVertexMasterInterceptionUtil;
+
+  /**
+   * Called immediately as user's {@link MasterCompute#compute()} method is
+   * entered.
+   */
+  public void interceptComputeBegin() {
+    LOG.info(this.getClass().getName() + ".interceptInitializeEnd is called ");
+    giraphMasterScenarioWrapper = new GiraphMasterScenarioWrapper(this
+      .getClass().getName());
+    if (commonVertexMasterInterceptionUtil == null) {
+      commonVertexMasterInterceptionUtil = new
+        CommonVertexMasterInterceptionUtil(getContext().getJobID().toString());
+    }
+    commonVertexMasterInterceptionUtil.initCommonVertexMasterContextWrapper(
+      getConf(), getSuperstep(), getTotalNumVertices(), getTotalNumEdges());
+    giraphMasterScenarioWrapper
+      .setCommonVertexMasterContextWrapper(commonVertexMasterInterceptionUtil
+        .getCommonVertexMasterContextWrapper());
+  }
+
+  /**
+   * Intercepts the call to {@link MasterCompute#getAggregatedValue(String)} to
+   * capture aggregator values at each superstep.
+   *
+   * @param <A>
+   *          The type of the aggregator value.
+   * @param name
+   *          The name of the Giraph aggregator.
+   * @return The aggregator value returned by the original
+   *         {@link MasterCompute#getAggregatedValue(String)}.
+   */
+  @Intercept(renameTo = "getAggregatedValue")
+  public <A extends Writable> A getAggregatedValueIntercept(String name) {
+    A retVal = super.<A>getAggregatedValue(name);
+    commonVertexMasterInterceptionUtil.addAggregatedValueIfNotExists(name,
+      retVal);
+    return retVal;
+  }
+
+  /**
+   * Called when user's {@link MasterCompute#compute()} method throws an
+   * exception.
+   *
+   * @param e
+   *          exception thrown.
+   */
+  protected final void interceptComputeException(Exception e) {
+    LOG.info("Caught an exception in user's MasterCompute. message: " +
+      e.getMessage() + ". Saving a trace in HDFS.");
+    ExceptionWrapper exceptionWrapper = new ExceptionWrapper(e.getMessage(),
+      ExceptionUtils.getStackTrace(e));
+    giraphMasterScenarioWrapper.setExceptionWrapper(exceptionWrapper);
+    commonVertexMasterInterceptionUtil.saveScenarioWrapper(
+      giraphMasterScenarioWrapper, DebuggerUtils.getFullMasterTraceFileName(
+        DebugTrace.MASTER_EXCEPTION,
+        commonVertexMasterInterceptionUtil.getJobId(), getSuperstep()));
+  }
+
+  /**
+   * Called after user's {@link MasterCompute#compute()} method returns.
+   */
+  public void interceptComputeEnd() {
+    commonVertexMasterInterceptionUtil.saveScenarioWrapper(
+      giraphMasterScenarioWrapper, DebuggerUtils.getFullMasterTraceFileName(
+        DebugTrace.MASTER_REGULAR,
+        commonVertexMasterInterceptionUtil.getJobId(), getSuperstep()));
+  }
+
+  @Override
+  public void readFields(DataInput arg0) throws IOException {
+  }
+
+  @Override
+  public void write(DataOutput arg0) throws IOException {
+  }
+}