You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/02/05 02:30:27 UTC

[2/2] drill git commit: DRILL-2114: Update profile page to provide query fragment state, node where query was run, and memory information. Also, extract giant class into smaller units.

DRILL-2114: Update profile page to provide query fragment state, node where query was run, and memory information.  Also, extract giant class into smaller units.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a9ee7911
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a9ee7911
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a9ee7911

Branch: refs/heads/master
Commit: a9ee79110450d3a68cb3600552207226594e5b58
Parents: 6331d35
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed Jan 14 07:41:41 2015 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Feb 4 16:28:03 2015 -0800

----------------------------------------------------------------------
 .../apache/drill/exec/ops/FragmentContext.java  |   6 +-
 .../apache/drill/exec/ops/FragmentStats.java    |   5 +-
 .../drill/exec/server/rest/DrillRestServer.java |   1 +
 .../exec/server/rest/ProfileResources.java      | 260 ---------
 .../drill/exec/server/rest/ProfileWrapper.java  | 541 -------------------
 .../exec/server/rest/profile/Comparators.java   |  93 ++++
 .../drill/exec/server/rest/profile/Filters.java |  41 ++
 .../server/rest/profile/FragmentWrapper.java    | 141 +++++
 .../rest/profile/OperatorPathBuilder.java       |  86 +++
 .../server/rest/profile/OperatorWrapper.java    | 117 ++++
 .../server/rest/profile/ProfileResources.java   | 260 +++++++++
 .../server/rest/profile/ProfileWrapper.java     | 138 +++++
 .../exec/server/rest/profile/TableBuilder.java  | 142 +++++
 .../apache/drill/exec/work/foreman/Foreman.java |   4 +-
 .../work/fragment/AbstractStatusReporter.java   |   2 +-
 15 files changed, 1031 insertions(+), 806 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index dc47f4e..e413921 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -88,7 +88,7 @@ public class FragmentContext implements Closeable {
 
   public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
       FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException {
-    this.stats = new FragmentStats(dbContext.getMetrics(), fragment.getAssignment());
+
     this.context = dbContext;
     this.connection = connection;
     this.fragment = fragment;
@@ -109,14 +109,16 @@ public class FragmentContext implements Closeable {
     } catch (Exception e) {
       throw new ExecutionSetupException("Failure while reading plan options.", e);
     }
+
     // Add the fragment context to the root allocator.
     // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
     try {
-      this.allocator = context.getAllocator().getChildAllocator(this, fragment.getMemInitial(), fragment.getMemMax(), true);
+      this.allocator = dbContext.getAllocator().getChildAllocator(this, fragment.getMemInitial(), fragment.getMemMax(), true);
       assert (allocator != null);
     }catch(Throwable e){
       throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
     }
+    this.stats = new FragmentStats(allocator, dbContext.getMetrics(), fragment.getAssignment());
 
     this.loader = new QueryClassLoader(dbContext.getConfig(), fragmentOptions);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
index 4431235..b6b0a8b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
@@ -32,14 +32,17 @@ public class FragmentStats {
   private List<OperatorStats> operators = Lists.newArrayList();
   private final long startTime;
   private final DrillbitEndpoint endpoint;
+  private final BufferAllocator allocator;
 
-  public FragmentStats(MetricRegistry metrics, DrillbitEndpoint endpoint) {
+  public FragmentStats(BufferAllocator allocator, MetricRegistry metrics, DrillbitEndpoint endpoint) {
     this.startTime = System.currentTimeMillis();
     this.endpoint = endpoint;
+    this.allocator = allocator;
   }
 
   public void addMetricsToStatus(MinorFragmentProfile.Builder prfB) {
     prfB.setStartTime(startTime);
+    prfB.setMaxMemoryUsed(allocator.getPeakMemoryAllocation());
     prfB.setEndTime(System.currentTimeMillis());
     prfB.setEndpoint(endpoint);
     for(OperatorStats o : operators){

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index e8cb4ba..147e4a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.server.rest;
 
+import org.apache.drill.exec.server.rest.profile.ProfileResources;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.work.WorkManager;

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
deleted file mode 100644
index 58b3d4e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.server.rest;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.inject.Inject;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
-import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.store.sys.EStore;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.work.WorkManager;
-import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.foreman.QueryStatus;
-import org.glassfish.jersey.server.mvc.Viewable;
-
-import com.google.common.collect.Lists;
-
-@Path("/")
-public class ProfileResources {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileResources.class);
-
-  @Inject WorkManager work;
-
-  public static class ProfileInfo implements Comparable<ProfileInfo> {
-    public static final SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
-
-    private String queryId;
-    private Date time;
-    private String location;
-    private String foreman;
-    private String query;
-    private String state;
-
-    public ProfileInfo(String queryId, long time, String foreman, String query, String state) {
-      this.queryId = queryId;
-      this.time = new Date(time);
-      this.foreman = foreman;
-      this.location = "http://localhost:8047/profile/" + queryId + ".json";
-      this.query = query = query.substring(0,  Math.min(query.length(), 150));
-      this.state = state;
-    }
-
-    public String getQuery(){
-      return query;
-    }
-
-    public String getQueryId() {
-      return queryId;
-    }
-
-    public String getTime() {
-      return format.format(time);
-    }
-
-
-    public String getState() {
-      return state;
-    }
-
-    public String getLocation() {
-      return location;
-    }
-
-    @Override
-    public int compareTo(ProfileInfo other) {
-      return time.compareTo(other.time);
-    }
-
-    public String getForeman() {
-      return foreman;
-    }
-
-  }
-
-  private PStoreProvider provider(){
-    return work.getContext().getPersistentStoreProvider();
-  }
-
-  @XmlRootElement
-  public class QProfiles {
-    private List<ProfileInfo> runningQueries;
-    private List<ProfileInfo> finishedQueries;
-
-    public QProfiles(List<ProfileInfo> runningQueries, List<ProfileInfo> finishedQueries) {
-      this.runningQueries = runningQueries;
-      this.finishedQueries = finishedQueries;
-    }
-
-    public List<ProfileInfo> getRunningQueries() {
-      return runningQueries;
-    }
-
-    public List<ProfileInfo> getFinishedQueries() {
-      return finishedQueries;
-    }
-  }
-
-  @GET
-  @Path("/profiles.json")
-  @Produces(MediaType.APPLICATION_JSON)
-  public QProfiles getProfilesJSON() {
-    PStore<QueryProfile> completed = null;
-    PStore<QueryInfo> running = null;
-    try {
-      completed = provider().getStore(QueryStatus.QUERY_PROFILE);
-      running = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
-    } catch (IOException e) {
-      logger.debug("Failed to get profiles from persistent or ephemeral store.");
-      return new QProfiles(new ArrayList<ProfileInfo>(), new ArrayList<ProfileInfo>());
-    }
-
-    List<ProfileInfo> runningQueries = Lists.newArrayList();
-
-    for (Map.Entry<String, QueryInfo> entry : running) {
-      QueryInfo profile = entry.getValue();
-      runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name()));
-    }
-
-    Collections.sort(runningQueries, Collections.reverseOrder());
-
-
-    List<ProfileInfo> finishedQueries = Lists.newArrayList();
-    for (Map.Entry<String, QueryProfile> entry : completed) {
-      QueryProfile profile = entry.getValue();
-      finishedQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name()));
-    }
-
-    return new QProfiles(runningQueries, finishedQueries);
-  }
-
-  @GET
-  @Path("/profiles")
-  @Produces(MediaType.TEXT_HTML)
-  public Viewable getProfiles() {
-    QProfiles profiles = getProfilesJSON();
-    return new Viewable("/rest/profile/list.ftl", profiles);
-  }
-
-  private QueryProfile getQueryProfile(String queryId) {
-    QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
-
-    // first check local running
-    Foreman f = work.getBee().getForemanForQueryId(id);
-    if(f != null){
-      return f.getQueryStatus().getAsProfile();
-    }
-
-    // then check remote running
-    try{
-      PStore<QueryInfo> runningQueries = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
-      QueryInfo info = runningQueries.get(queryId);
-      return work.getContext().getController().getTunnel(info.getForeman()).requestQueryProfile(id).checkedGet(2, TimeUnit.SECONDS);
-    }catch(Exception e){
-      logger.debug("Failure to find query as running profile.", e);
-    }
-
-    // then check blob store
-    try{
-      PStore<QueryProfile> profiles = provider().getStore(QueryStatus.QUERY_PROFILE);
-      return profiles.get(queryId);
-    }catch(Exception e){
-      logger.warn("Failure to load query profile for query {}", queryId, e);
-    }
-
-    // TODO: Improve error messaging.
-    return QueryProfile.getDefaultInstance();
-
-  }
-
-
-  @GET
-  @Path("/profiles/{queryid}.json")
-  @Produces(MediaType.APPLICATION_JSON)
-  public String getProfileJSON(@PathParam("queryid") String queryId) {
-    try {
-      return new String(QueryStatus.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId)));
-    } catch (IOException e) {
-      logger.debug("Failed to serialize profile for: " + queryId);
-      return ("{ 'message' : 'error (unable to serialize profile)' }");
-    }
-  }
-
-  @GET
-  @Path("/profiles/{queryid}")
-  @Produces(MediaType.TEXT_HTML)
-  public Viewable getProfile(@PathParam("queryid") String queryId) {
-    ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId));
-
-    return new Viewable("/rest/profile/profile.ftl", wrapper);
-
-  }
-
-
-  @GET
-  @Path("/profiles/cancel/{queryid}")
-  @Produces(MediaType.TEXT_PLAIN)
-  public String cancelQuery(@PathParam("queryid") String queryId) throws IOException {
-
-    QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
-
-    // first check local running
-    Foreman f = work.getBee().getForemanForQueryId(id);
-    if(f != null){
-      f.cancel();
-      return String.format("Cancelled query %s on locally running node.", queryId);
-    }
-
-    // then check remote running
-    try{
-      PStore<QueryInfo> runningQueries = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
-      QueryInfo info = runningQueries.get(queryId);
-      Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2, TimeUnit.SECONDS);
-      if(a.getOk()){
-        return String.format("Query %s canceled on node %s.", queryId, info.getForeman().getAddress());
-      }else{
-        return String.format("Attempted to cancel query %s on %s but the query is no longer active on that node.", queryId, info.getForeman().getAddress());
-      }
-    }catch(Exception e){
-      logger.debug("Failure to find query as running profile.", e);
-      return String.format("Failure attempting to cancel query %s.  Unable to find information about where query is actively running.", queryId);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
deleted file mode 100644
index 80c08d3..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
+++ /dev/null
@@ -1,541 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.server.rest;
-
-import java.text.DateFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
-import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
-import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
-import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class ProfileWrapper {
-
-  public QueryProfile profile;
-  public String id;
-
-  public ProfileWrapper(QueryProfile profile) {
-    this.profile = profile;
-    this.id = QueryIdHelper.getQueryId(profile.getId());
-  }
-
-  public QueryProfile getProfile() {
-    return profile;
-  }
-
-  public String getId() {
-    return id;
-  }
-
-  public String getQueryId() {
-    return QueryIdHelper.getQueryId(profile.getId());
-  }
-
-  public List<OperatorWrapper> getOperatorProfiles() {
-    List<OperatorWrapper> ows = Lists.newArrayList();
-    Map<ImmutablePair<Integer, Integer>, List<ImmutablePair<OperatorProfile, Integer>>> opmap = Maps.newHashMap();
-
-    List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
-    Collections.sort(majors, Comparators.majorIdCompare);
-    for (MajorFragmentProfile major : majors) {
-
-      List<MinorFragmentProfile> minors = new ArrayList<>(major.getMinorFragmentProfileList());
-      Collections.sort(minors, Comparators.minorIdCompare);
-      for (MinorFragmentProfile minor : minors) {
-
-        List<OperatorProfile> ops = new ArrayList<>(minor.getOperatorProfileList());
-        Collections.sort(ops, Comparators.operatorIdCompare);
-        for (OperatorProfile op : ops) {
-
-          ImmutablePair<Integer, Integer> ip = new ImmutablePair<>(
-              major.getMajorFragmentId(), op.getOperatorId());
-          if (!opmap.containsKey(ip)) {
-            List<ImmutablePair<OperatorProfile, Integer>> l = Lists.newArrayList();
-            opmap.put(ip, l);
-          }
-          opmap.get(ip).add(new ImmutablePair<>(op, minor.getMinorFragmentId()));
-        }
-      }
-    }
-
-    List<ImmutablePair<Integer, Integer>> keys = new ArrayList<>(opmap.keySet());
-    Collections.sort(keys);
-    ImmutablePair<OperatorProfile, Integer> val;
-    for (ImmutablePair<Integer, Integer> ip : keys) {
-      ows.add(new OperatorWrapper(ip.getLeft(), opmap.get(ip)));
-    }
-
-    return ows;
-  }
-
-  public List<FragmentWrapper> getFragmentProfiles() {
-    List<FragmentWrapper> fws = Lists.newArrayList();
-
-    List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
-    Collections.sort(majors, Comparators.majorIdCompare);
-    for (MajorFragmentProfile major : majors) {
-      fws.add(new FragmentWrapper(major));
-    }
-
-    return fws;
-  }
-
-  public String getFragmentsOverview() {
-    final String[] columns = {"Major Fragment", "Minor Fragments Reporting", "First Start", "Last Start", "First End", "Last End", "tmin", "tavg", "tmax"};
-    TableBuilder tb = new TableBuilder(columns);
-    for (FragmentWrapper fw : getFragmentProfiles()) {
-      fw.addSummary(tb);
-    }
-    return tb.toString();
-  }
-
-  public String majorFragmentTimingProfile(MajorFragmentProfile major) {
-    final String[] columns = {"Minor Fragment", "Start", "End", "Total Time", "Max Records", "Max Batches"};
-    TableBuilder builder = new TableBuilder(columns);
-
-    ArrayList<MinorFragmentProfile> complete, incomplete;
-    complete = new ArrayList<MinorFragmentProfile>(Collections2.filter(
-        major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
-    incomplete = new ArrayList<MinorFragmentProfile>(Collections2.filter(
-        major.getMinorFragmentProfileList(), Filters.missingOperatorsOrTimes));
-
-    Collections.sort(complete, Comparators.minorIdCompare);
-    for (MinorFragmentProfile minor : complete) {
-      ArrayList<OperatorProfile> ops = new ArrayList<OperatorProfile>(minor.getOperatorProfileList());
-
-      long t0 = profile.getStart();
-      long biggestIncomingRecords = 0;
-      long biggestBatches = 0;
-
-      for (OperatorProfile op : ops) {
-        long incomingRecords = 0;
-        long batches = 0;
-        for (StreamProfile sp : op.getInputProfileList()) {
-          incomingRecords += sp.getRecords();
-          batches += sp.getBatches();
-        }
-        biggestIncomingRecords = Math.max(biggestIncomingRecords, incomingRecords);
-        biggestBatches = Math.max(biggestBatches, batches);
-      }
-
-      builder.appendCell(new OperatorPathBuilder().setMajor(major).setMinor(minor).build(), null);
-      builder.appendMillis(minor.getStartTime() - t0, null);
-      builder.appendMillis(minor.getEndTime() - t0, null);
-      builder.appendMillis(minor.getEndTime() - minor.getStartTime(), null);
-
-      builder.appendInteger(biggestIncomingRecords, null);
-      builder.appendInteger(biggestBatches, null);
-    }
-    for (MinorFragmentProfile m : incomplete) {
-      builder.appendCell(
-          major.getMajorFragmentId() + "-"
-              + m.getMinorFragmentId(), null);
-      builder.appendRepeated(m.getState().toString(), null, 5);
-    }
-    return builder.toString();
-  }
-
-  public String getOperatorsOverview() {
-    final String [] columns = {"Operator", "Type", "Setup (min)", "Setup (avg)", "Setup (max)", "Process (min)", "Process (avg)", "Process (max)", "Wait (min)", "Wait (avg)", "Wait (max)"};
-    TableBuilder tb = new TableBuilder(columns);
-    for (OperatorWrapper ow : getOperatorProfiles()) {
-      ow.addSummary(tb);
-    }
-    return tb.toString();
-  }
-
-  public String getOperatorsJSON() {
-    StringBuilder sb = new StringBuilder("{");
-    String sep = "";
-    for (CoreOperatorType op : CoreOperatorType.values()) {
-      sb.append(String.format("%s\"%d\" : \"%s\"", sep, op.ordinal(), op));
-      sep = ", ";
-    }
-    return sb.append("}").toString();
-  }
-
-  private static class OperatorPathBuilder {
-    private static final String OPERATOR_PATH_PATTERN = "%s-%s-%s";
-    private static final String DEFAULT = "xx";
-    private String major;
-    private String minor;
-    private String operator;
-
-    public OperatorPathBuilder() {
-      clear();
-    }
-
-    public void clear() {
-      major = DEFAULT;
-      minor = DEFAULT;
-      operator = DEFAULT;
-    }
-
-    // Utility to left pad strings
-    protected String leftPad(String text) {
-      return String.format("00%s", text).substring(text.length());
-    }
-
-    public OperatorPathBuilder setMajor(MajorFragmentProfile major) {
-      if (major!=null) {
-        return setMajor(major.getMajorFragmentId());
-      }
-      return this;
-    }
-
-    public OperatorPathBuilder setMajor(int newMajor) {
-      major = leftPad(String.valueOf(newMajor));
-      return this;
-    }
-
-    public OperatorPathBuilder setMinor(MinorFragmentProfile minor) {
-      if (minor!=null) {
-        return setMinor(minor.getMinorFragmentId());
-      }
-      return this;
-    }
-
-    public OperatorPathBuilder setMinor(int newMinor) {
-      minor = leftPad(String.valueOf(newMinor));
-      return this;
-    }
-
-    public OperatorPathBuilder setOperator(OperatorProfile op) {
-      if (op!=null) {
-        return setOperator(op.getOperatorId());
-      }
-      return this;
-    }
-
-    public OperatorPathBuilder setOperator(int newOp) {
-      operator = leftPad(String.valueOf(newOp));
-      return this;
-    }
-
-    public String build() {
-      StringBuffer sb = new StringBuffer();
-      return sb.append(major).append("-")
-          .append(minor).append("-")
-          .append(operator)
-          .toString();
-    }
-  }
-
-  public class FragmentWrapper {
-    private final MajorFragmentProfile major;
-
-    public FragmentWrapper(MajorFragmentProfile major) {
-      this.major = Preconditions.checkNotNull(major);
-    }
-
-    public String getDisplayName() {
-      return String.format("Major Fragment: %s", new OperatorPathBuilder().setMajor(major).build());
-    }
-
-    public String getId() {
-      return String.format("fragment-%s", major.getMajorFragmentId());
-    }
-
-    public void addSummary(TableBuilder tb) {
-      final String fmt = " (%d)";
-      long t0 = profile.getStart();
-
-      ArrayList<MinorFragmentProfile> complete = new ArrayList<MinorFragmentProfile>(
-          Collections2.filter(major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
-
-      tb.appendCell(new OperatorPathBuilder().setMajor(major).build(), null);
-      tb.appendCell(complete.size() + " / " + major.getMinorFragmentProfileCount(), null);
-
-      if (complete.size() < 1) {
-        tb.appendRepeated("", null, 7);
-        return;
-      }
-
-      int li = complete.size() - 1;
-
-      Collections.sort(complete, Comparators.startTimeCompare);
-      tb.appendMillis(complete.get(0).getStartTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
-      tb.appendMillis(complete.get(li).getStartTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
-
-      Collections.sort(complete, Comparators.endTimeCompare);
-      tb.appendMillis(complete.get(0).getEndTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
-      tb.appendMillis(complete.get(li).getEndTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
-
-      long total = 0;
-      for (MinorFragmentProfile p : complete) {
-        total += p.getEndTime() - p.getStartTime();
-      }
-      Collections.sort(complete, Comparators.runTimeCompare);
-      tb.appendMillis(complete.get(0).getEndTime() - complete.get(0).getStartTime(),
-          String.format(fmt, complete.get(0).getMinorFragmentId()));
-      tb.appendMillis((long) (total / complete.size()), null);
-      tb.appendMillis(complete.get(li).getEndTime() - complete.get(li).getStartTime(),
-          String.format(fmt, complete.get(li).getMinorFragmentId()));
-    }
-
-    public String getContent() {
-      return majorFragmentTimingProfile(major);
-    }
-  }
-
-  public class OperatorWrapper {
-    private final int major;
-    private List<ImmutablePair<OperatorProfile, Integer>> ops;
-
-    public OperatorWrapper(int major, List<ImmutablePair<OperatorProfile, Integer>> ops) {
-      assert ops.size() > 0;
-      this.major = major;
-      this.ops = ops;
-    }
-
-    public String getDisplayName() {
-      OperatorProfile op = ops.get(0).getLeft();
-      String path = new OperatorPathBuilder().setMajor(major).setOperator(op).build();
-      CoreOperatorType operatorType = CoreOperatorType.valueOf(op.getOperatorType());
-      return String.format("%s - %s", path, operatorType == null ? "UKNOWN_OPERATOR" : operatorType.toString());
-    }
-
-    public String getId() {
-      return String.format("operator-%d-%d", major, ops.get(0).getLeft().getOperatorId());
-    }
-
-    public String getContent() {
-      final String [] columns = {"Minor Fragment", "Setup", "Process", "Wait", "Max Batches", "Max Records"};
-      TableBuilder builder = new TableBuilder(columns);
-
-      for (ImmutablePair<OperatorProfile, Integer> ip : ops) {
-        int minor = ip.getRight();
-        OperatorProfile op = ip.getLeft();
-
-        String path = new OperatorPathBuilder().setMajor(major).setMinor(minor).setOperator(op).build();
-        builder.appendCell(path, null);
-        builder.appendNanos(op.getSetupNanos(), null);
-        builder.appendNanos(op.getProcessNanos(), null);
-        builder.appendNanos(op.getWaitNanos(), null);
-
-        long maxBatches = Long.MIN_VALUE;
-        long maxRecords = Long.MIN_VALUE;
-        for (StreamProfile sp : op.getInputProfileList()) {
-          maxBatches = Math.max(sp.getBatches(), maxBatches);
-          maxRecords = Math.max(sp.getRecords(), maxRecords);
-        }
-
-        builder.appendInteger(maxBatches, null);
-        builder.appendInteger(maxRecords, null);
-      }
-      return builder.toString();
-    }
-
-    public void addSummary(TableBuilder tb) {
-      OperatorProfile op = ops.get(0).getLeft();
-      String path = new OperatorPathBuilder().setMajor(major).setOperator(op).build();
-      tb.appendCell(path, null);
-      CoreOperatorType operatorType = CoreOperatorType.valueOf(ops.get(0).getLeft().getOperatorType());
-      tb.appendCell(operatorType == null ? "UNKNOWN_OPERATOR" : operatorType.toString(), null);
-
-      int li = ops.size() - 1;
-      String fmt = " (%s)";
-
-      double setupSum = 0.0;
-      double processSum = 0.0;
-      double waitSum = 0.0;
-      for (ImmutablePair<OperatorProfile, Integer> ip : ops) {
-        setupSum += ip.getLeft().getSetupNanos();
-        processSum += ip.getLeft().getProcessNanos();
-        waitSum += ip.getLeft().getWaitNanos();
-      }
-
-      Collections.sort(ops, Comparators.setupTimeSort);
-      tb.appendNanos(ops.get(0).getLeft().getSetupNanos(), String.format(fmt, ops.get(0).getRight()));
-      tb.appendNanos((long) (setupSum / ops.size()), null);
-      tb.appendNanos(ops.get(li).getLeft().getSetupNanos(), String.format(fmt, ops.get(li).getRight()));
-
-      Collections.sort(ops, Comparators.processTimeSort);
-      tb.appendNanos(ops.get(0).getLeft().getProcessNanos(), String.format(fmt, ops.get(0).getRight()));
-      tb.appendNanos((long) (processSum / ops.size()), null);
-      tb.appendNanos(ops.get(li).getLeft().getProcessNanos(), String.format(fmt, ops.get(li).getRight()));
-
-      Collections.sort(ops, Comparators.waitTimeSort);
-      tb.appendNanos(ops.get(0).getLeft().getWaitNanos(), String.format(fmt, ops.get(0).getRight()));
-      tb.appendNanos((long) (waitSum / ops.size()), null);
-      tb.appendNanos(ops.get(li).getLeft().getWaitNanos(), String.format(fmt, ops.get(li).getRight()));
-    }
-  }
-
-  static class Comparators {
-    final static Comparator<MajorFragmentProfile> majorIdCompare = new Comparator<MajorFragmentProfile>() {
-      public int compare(MajorFragmentProfile o1, MajorFragmentProfile o2) {
-        return Long.compare(o1.getMajorFragmentId(), o2.getMajorFragmentId());
-      }
-    };
-
-    final static Comparator<MinorFragmentProfile> minorIdCompare = new Comparator<MinorFragmentProfile>() {
-      public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
-        return Long.compare(o1.getMinorFragmentId(), o2.getMinorFragmentId());
-      }
-    };
-
-    final static Comparator<MinorFragmentProfile> startTimeCompare = new Comparator<MinorFragmentProfile>() {
-      public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
-        return Long.compare(o1.getStartTime(), o2.getStartTime());
-      }
-    };
-
-    final static Comparator<MinorFragmentProfile> endTimeCompare = new Comparator<MinorFragmentProfile>() {
-      public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
-        return Long.compare(o1.getEndTime(), o2.getEndTime());
-      }
-    };
-
-    final static Comparator<MinorFragmentProfile> runTimeCompare = new Comparator<MinorFragmentProfile>() {
-      public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
-        return Long.compare(o1.getEndTime() - o1.getStartTime(), o2.getEndTime() - o2.getStartTime());
-      }
-    };
-
-    final static Comparator<OperatorProfile> operatorIdCompare = new Comparator<OperatorProfile>() {
-      public int compare(OperatorProfile o1, OperatorProfile o2) {
-        return Long.compare(o1.getOperatorId(), o2.getOperatorId());
-      }
-    };
-
-    final static Comparator<Pair<OperatorProfile, Integer>> setupTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
-      public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
-        return Long.compare(o1.getLeft().getSetupNanos(), o2.getLeft().getSetupNanos());
-      }
-    };
-
-    final static Comparator<Pair<OperatorProfile, Integer>> processTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
-      public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
-        return Long.compare(o1.getLeft().getProcessNanos(), o2.getLeft().getProcessNanos());
-      }
-    };
-
-    final static Comparator<Pair<OperatorProfile, Integer>> waitTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
-      public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
-        return Long.compare(o1.getLeft().getWaitNanos(), o2.getLeft().getWaitNanos());
-      }
-    };
-  }
-
-  private static class Filters {
-    final static Predicate<MinorFragmentProfile> hasOperators = new Predicate<MinorFragmentProfile>() {
-      public boolean apply(MinorFragmentProfile arg0) {
-        return arg0.getOperatorProfileCount() != 0;
-      }
-    };
-
-    final static Predicate<MinorFragmentProfile> hasTimes = new Predicate<MinorFragmentProfile>() {
-      public boolean apply(MinorFragmentProfile arg0) {
-        return arg0.hasStartTime() && arg0.hasEndTime();
-      }
-    };
-
-    final static Predicate<MinorFragmentProfile> hasOperatorsAndTimes = Predicates.and(Filters.hasOperators, Filters.hasTimes);
-
-    final static Predicate<MinorFragmentProfile> missingOperatorsOrTimes = Predicates.not(hasOperatorsAndTimes);
-  }
-
-  class TableBuilder {
-    NumberFormat format = NumberFormat.getInstance(Locale.US);
-    DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
-
-    StringBuilder sb;
-    int w = 0;
-    int width;
-
-    public TableBuilder(String[] columns) {
-      sb = new StringBuilder();
-      width = columns.length;
-
-      format.setMaximumFractionDigits(3);
-      format.setMinimumFractionDigits(3);
-
-      sb.append("<table class=\"table table-bordered text-right\">\n<tr>");
-      for (String cn : columns) {
-        sb.append("<th>" + cn + "</th>");
-      }
-      sb.append("</tr>\n");
-    }
-
-    public void appendCell(String s, String link) {
-      if (w == 0) {
-        sb.append("<tr>");
-      }
-      sb.append(String.format("<td>%s%s</td>", s, link != null ? link : ""));
-      if (++w >= width) {
-        sb.append("</tr>\n");
-        w = 0;
-      }
-    }
-
-    public void appendRepeated(String s, String link, int n) {
-      for (int i = 0; i < n; i++) {
-        appendCell(s, link);
-      }
-    }
-
-    public void appendTime(long d, String link) {
-      appendCell(dateFormat.format(d), link);
-    }
-
-    public void appendMillis(long p, String link) {
-      appendCell(format.format(p / 1000.0), link);
-    }
-
-    public void appendNanos(long p, String link) {
-      appendMillis((long) (p / 1000.0 / 1000.0), link);
-    }
-
-    public void appendFormattedNumber(Number n, String link) {
-      appendCell(format.format(n), link);
-    }
-
-    public void appendInteger(long l, String link) {
-      appendCell(Long.toString(l), link);
-    }
-
-    @Override
-    public String toString() {
-      String rv;
-      rv = sb.append("\n</table>").toString();
-      sb = null;
-      return rv;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
new file mode 100644
index 0000000..e9024ff
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import java.util.Comparator;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+
+interface Comparators {
+  final static Comparator<MajorFragmentProfile> majorIdCompare = new Comparator<MajorFragmentProfile>() {
+    public int compare(MajorFragmentProfile o1, MajorFragmentProfile o2) {
+      return Long.compare(o1.getMajorFragmentId(), o2.getMajorFragmentId());
+    }
+  };
+
+  final static Comparator<MinorFragmentProfile> minorIdCompare = new Comparator<MinorFragmentProfile>() {
+    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+      return Long.compare(o1.getMinorFragmentId(), o2.getMinorFragmentId());
+    }
+  };
+
+  final static Comparator<MinorFragmentProfile> startTimeCompare = new Comparator<MinorFragmentProfile>() {
+    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+      return Long.compare(o1.getStartTime(), o2.getStartTime());
+    }
+  };
+
+  final static Comparator<MinorFragmentProfile> endTimeCompare = new Comparator<MinorFragmentProfile>() {
+    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+      return Long.compare(o1.getEndTime(), o2.getEndTime());
+    }
+  };
+
+  final static Comparator<MinorFragmentProfile> fragPeakMemAllocated = new Comparator<MinorFragmentProfile>() {
+    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+      return Long.compare(o1.getMaxMemoryUsed(), o2.getMaxMemoryUsed());
+    }
+  };
+
+  final static Comparator<MinorFragmentProfile> runTimeCompare = new Comparator<MinorFragmentProfile>() {
+    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+      return Long.compare(o1.getEndTime() - o1.getStartTime(), o2.getEndTime() - o2.getStartTime());
+    }
+  };
+
+  final static Comparator<OperatorProfile> operatorIdCompare = new Comparator<OperatorProfile>() {
+    public int compare(OperatorProfile o1, OperatorProfile o2) {
+      return Long.compare(o1.getOperatorId(), o2.getOperatorId());
+    }
+  };
+
+  final static Comparator<Pair<OperatorProfile, Integer>> setupTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
+    public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+      return Long.compare(o1.getLeft().getSetupNanos(), o2.getLeft().getSetupNanos());
+    }
+  };
+
+  final static Comparator<Pair<OperatorProfile, Integer>> processTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
+    public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+      return Long.compare(o1.getLeft().getProcessNanos(), o2.getLeft().getProcessNanos());
+    }
+  };
+
+  final static Comparator<Pair<OperatorProfile, Integer>> waitTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
+    public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+      return Long.compare(o1.getLeft().getWaitNanos(), o2.getLeft().getWaitNanos());
+    }
+  };
+
+  final static Comparator<Pair<OperatorProfile, Integer>> opPeakMem = new Comparator<Pair<OperatorProfile, Integer>>() {
+    public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+      return Long.compare(o1.getLeft().getPeakLocalMemoryAllocated(), o2.getLeft().getPeakLocalMemoryAllocated());
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Filters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Filters.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Filters.java
new file mode 100644
index 0000000..7dbe947
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Filters.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
+interface Filters {
+  final static Predicate<MinorFragmentProfile> hasOperators = new Predicate<MinorFragmentProfile>() {
+    public boolean apply(MinorFragmentProfile arg0) {
+      return arg0.getOperatorProfileCount() != 0;
+    }
+  };
+
+  final static Predicate<MinorFragmentProfile> hasTimes = new Predicate<MinorFragmentProfile>() {
+    public boolean apply(MinorFragmentProfile arg0) {
+      return arg0.hasStartTime() && arg0.hasEndTime();
+    }
+  };
+
+  final static Predicate<MinorFragmentProfile> hasOperatorsAndTimes = Predicates.and(Filters.hasOperators, Filters.hasTimes);
+
+  final static Predicate<MinorFragmentProfile> missingOperatorsOrTimes = Predicates.not(hasOperatorsAndTimes);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
new file mode 100644
index 0000000..3a66327
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+
+public class FragmentWrapper {
+  private final MajorFragmentProfile major;
+  private final long start;
+
+  public FragmentWrapper(MajorFragmentProfile major, long start) {
+    this.major = Preconditions.checkNotNull(major);
+    this.start = start;
+  }
+
+  public String getDisplayName() {
+    return String.format("Major Fragment: %s", new OperatorPathBuilder().setMajor(major).build());
+  }
+
+  public String getId() {
+    return String.format("fragment-%s", major.getMajorFragmentId());
+  }
+
+  public void addSummary(TableBuilder tb) {
+    final String fmt = " (%d)";
+    long t0 = start;
+
+    ArrayList<MinorFragmentProfile> complete = new ArrayList<MinorFragmentProfile>(
+        Collections2.filter(major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
+
+    tb.appendCell(new OperatorPathBuilder().setMajor(major).build(), null);
+    tb.appendCell(complete.size() + " / " + major.getMinorFragmentProfileCount(), null);
+
+    if (complete.size() < 1) {
+      tb.appendRepeated("", null, 7);
+      return;
+    }
+
+    int li = complete.size() - 1;
+
+    Collections.sort(complete, Comparators.startTimeCompare);
+    tb.appendMillis(complete.get(0).getStartTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
+    tb.appendMillis(complete.get(li).getStartTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
+
+    Collections.sort(complete, Comparators.endTimeCompare);
+    tb.appendMillis(complete.get(0).getEndTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
+    tb.appendMillis(complete.get(li).getEndTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
+
+    long total = 0;
+    for (MinorFragmentProfile p : complete) {
+      total += p.getEndTime() - p.getStartTime();
+    }
+    Collections.sort(complete, Comparators.runTimeCompare);
+    tb.appendMillis(complete.get(0).getEndTime() - complete.get(0).getStartTime(),
+        String.format(fmt, complete.get(0).getMinorFragmentId()));
+    tb.appendMillis((long) (total / complete.size()), null);
+    tb.appendMillis(complete.get(li).getEndTime() - complete.get(li).getStartTime(),
+        String.format(fmt, complete.get(li).getMinorFragmentId()));
+
+    Collections.sort(complete, Comparators.fragPeakMemAllocated);
+    tb.appendBytes(complete.get(li).getMaxMemoryUsed(), null);
+  }
+
+  public String getContent() {
+    return majorFragmentTimingProfile(major);
+  }
+
+
+  public String majorFragmentTimingProfile(MajorFragmentProfile major) {
+    final String[] columns = {"Minor Fragment", "Host", "Start", "End", "Total Time", "Max Records", "Max Batches", "Peak Memory", "State"};
+    TableBuilder builder = new TableBuilder(columns);
+
+    ArrayList<MinorFragmentProfile> complete, incomplete;
+    complete = new ArrayList<MinorFragmentProfile>(Collections2.filter(
+        major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
+    incomplete = new ArrayList<MinorFragmentProfile>(Collections2.filter(
+        major.getMinorFragmentProfileList(), Filters.missingOperatorsOrTimes));
+
+    Collections.sort(complete, Comparators.minorIdCompare);
+    for (MinorFragmentProfile minor : complete) {
+      ArrayList<OperatorProfile> ops = new ArrayList<OperatorProfile>(minor.getOperatorProfileList());
+
+      long t0 = start;
+      long biggestIncomingRecords = 0;
+      long biggestBatches = 0;
+
+      for (OperatorProfile op : ops) {
+        long incomingRecords = 0;
+        long batches = 0;
+        for (StreamProfile sp : op.getInputProfileList()) {
+          incomingRecords += sp.getRecords();
+          batches += sp.getBatches();
+        }
+        biggestIncomingRecords = Math.max(biggestIncomingRecords, incomingRecords);
+        biggestBatches = Math.max(biggestBatches, batches);
+      }
+
+      builder.appendCell(new OperatorPathBuilder().setMajor(major).setMinor(minor).build(), null);
+      builder.appendCell(minor.getEndpoint().getAddress(), null);
+      builder.appendMillis(minor.getStartTime() - t0, null);
+      builder.appendMillis(minor.getEndTime() - t0, null);
+      builder.appendMillis(minor.getEndTime() - minor.getStartTime(), null);
+
+      builder.appendFormattedInteger(biggestIncomingRecords, null);
+      builder.appendFormattedInteger(biggestBatches, null);
+      builder.appendBytes(minor.getMaxMemoryUsed(), null);
+      builder.appendCell(minor.getState().name(), null);
+    }
+    for (MinorFragmentProfile m : incomplete) {
+      builder.appendCell(
+          major.getMajorFragmentId() + "-"
+              + m.getMinorFragmentId(), null);
+      builder.appendRepeated(m.getState().toString(), null, 6);
+    }
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorPathBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorPathBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorPathBuilder.java
new file mode 100644
index 0000000..4cf378f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorPathBuilder.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+
+public class OperatorPathBuilder {
+  private static final String OPERATOR_PATH_PATTERN = "%s-%s-%s";
+  private static final String DEFAULT = "xx";
+  private String major;
+  private String minor;
+  private String operator;
+
+  public OperatorPathBuilder() {
+    clear();
+  }
+
+  public void clear() {
+    major = DEFAULT;
+    minor = DEFAULT;
+    operator = DEFAULT;
+  }
+
+  // Utility to left pad strings
+  protected String leftPad(String text) {
+    return String.format("00%s", text).substring(text.length());
+  }
+
+  public OperatorPathBuilder setMajor(MajorFragmentProfile major) {
+    if (major != null) {
+      return setMajor(major.getMajorFragmentId());
+    }
+    return this;
+  }
+
+  public OperatorPathBuilder setMajor(int newMajor) {
+    major = leftPad(String.valueOf(newMajor));
+    return this;
+  }
+
+  public OperatorPathBuilder setMinor(MinorFragmentProfile minor) {
+    if (minor != null) {
+      return setMinor(minor.getMinorFragmentId());
+    }
+    return this;
+  }
+
+  public OperatorPathBuilder setMinor(int newMinor) {
+    minor = leftPad(String.valueOf(newMinor));
+    return this;
+  }
+
+  public OperatorPathBuilder setOperator(OperatorProfile op) {
+    if (op != null) {
+      return setOperator(op.getOperatorId());
+    }
+    return this;
+  }
+
+  public OperatorPathBuilder setOperator(int newOp) {
+    operator = leftPad(String.valueOf(newOp));
+    return this;
+  }
+
+  public String build() {
+    StringBuffer sb = new StringBuffer();
+    return sb.append(major).append("-").append(minor).append("-").append(operator).toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
new file mode 100644
index 0000000..4f4fcdb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
+
+public class OperatorWrapper {
+  private final int major;
+  private List<ImmutablePair<OperatorProfile, Integer>> ops;
+
+  public OperatorWrapper(int major, List<ImmutablePair<OperatorProfile, Integer>> ops) {
+    assert ops.size() > 0;
+    this.major = major;
+    this.ops = ops;
+  }
+
+  public String getDisplayName() {
+    OperatorProfile op = ops.get(0).getLeft();
+    String path = new OperatorPathBuilder().setMajor(major).setOperator(op).build();
+    CoreOperatorType operatorType = CoreOperatorType.valueOf(op.getOperatorType());
+    return String.format("%s - %s", path, operatorType == null ? "UKNOWN_OPERATOR" : operatorType.toString());
+  }
+
+  public String getId() {
+    return String.format("operator-%d-%d", major, ops.get(0).getLeft().getOperatorId());
+  }
+
+  public String getContent() {
+    final String [] columns = {"Minor Fragment", "Setup", "Process", "Wait", "Max Batches", "Max Records", "Peak Mem"};
+    TableBuilder builder = new TableBuilder(columns);
+
+    for (ImmutablePair<OperatorProfile, Integer> ip : ops) {
+      int minor = ip.getRight();
+      OperatorProfile op = ip.getLeft();
+
+      String path = new OperatorPathBuilder().setMajor(major).setMinor(minor).setOperator(op).build();
+      builder.appendCell(path, null);
+      builder.appendNanos(op.getSetupNanos(), null);
+      builder.appendNanos(op.getProcessNanos(), null);
+      builder.appendNanos(op.getWaitNanos(), null);
+
+      long maxBatches = Long.MIN_VALUE;
+      long maxRecords = Long.MIN_VALUE;
+      for (StreamProfile sp : op.getInputProfileList()) {
+        maxBatches = Math.max(sp.getBatches(), maxBatches);
+        maxRecords = Math.max(sp.getRecords(), maxRecords);
+      }
+
+      builder.appendFormattedInteger(maxBatches, null);
+      builder.appendFormattedInteger(maxRecords, null);
+      builder.appendBytes(op.getPeakLocalMemoryAllocated(), null);
+    }
+    return builder.toString();
+  }
+
+  public void addSummary(TableBuilder tb) {
+    OperatorProfile op = ops.get(0).getLeft();
+    String path = new OperatorPathBuilder().setMajor(major).setOperator(op).build();
+    tb.appendCell(path, null);
+    CoreOperatorType operatorType = CoreOperatorType.valueOf(ops.get(0).getLeft().getOperatorType());
+    tb.appendCell(operatorType == null ? "UNKNOWN_OPERATOR" : operatorType.toString(), null);
+
+    int li = ops.size() - 1;
+    String fmt = " (%s)";
+
+    double setupSum = 0.0;
+    double processSum = 0.0;
+    double waitSum = 0.0;
+    double memSum = 0.0;
+    for (ImmutablePair<OperatorProfile, Integer> ip : ops) {
+      setupSum += ip.getLeft().getSetupNanos();
+      processSum += ip.getLeft().getProcessNanos();
+      waitSum += ip.getLeft().getWaitNanos();
+      memSum += ip.getLeft().getPeakLocalMemoryAllocated();
+    }
+
+    Collections.sort(ops, Comparators.setupTimeSort);
+    tb.appendNanos(ops.get(0).getLeft().getSetupNanos(), String.format(fmt, ops.get(0).getRight()));
+    tb.appendNanos((long) (setupSum / ops.size()), null);
+    tb.appendNanos(ops.get(li).getLeft().getSetupNanos(), String.format(fmt, ops.get(li).getRight()));
+
+    Collections.sort(ops, Comparators.processTimeSort);
+    tb.appendNanos(ops.get(0).getLeft().getProcessNanos(), String.format(fmt, ops.get(0).getRight()));
+    tb.appendNanos((long) (processSum / ops.size()), null);
+    tb.appendNanos(ops.get(li).getLeft().getProcessNanos(), String.format(fmt, ops.get(li).getRight()));
+
+    Collections.sort(ops, Comparators.waitTimeSort);
+    tb.appendNanos(ops.get(0).getLeft().getWaitNanos(), String.format(fmt, ops.get(0).getRight()));
+    tb.appendNanos((long) (waitSum / ops.size()), null);
+    tb.appendNanos(ops.get(li).getLeft().getWaitNanos(), String.format(fmt, ops.get(li).getRight()));
+
+    Collections.sort(ops, Comparators.opPeakMem);
+    tb.appendBytes((long) (memSum / ops.size()), null);
+    tb.appendBytes(ops.get(li).getLeft().getPeakLocalMemoryAllocated(), null);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
new file mode 100644
index 0000000..ae04bad
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.store.sys.EStore;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.work.WorkManager;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.QueryStatus;
+import org.glassfish.jersey.server.mvc.Viewable;
+
+import com.google.common.collect.Lists;
+
+@Path("/")
+public class ProfileResources {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileResources.class);
+
+  @Inject WorkManager work;
+
+  public static class ProfileInfo implements Comparable<ProfileInfo> {
+    public static final SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
+
+    private String queryId;
+    private Date time;
+    private String location;
+    private String foreman;
+    private String query;
+    private String state;
+
+    public ProfileInfo(String queryId, long time, String foreman, String query, String state) {
+      this.queryId = queryId;
+      this.time = new Date(time);
+      this.foreman = foreman;
+      this.location = "http://localhost:8047/profile/" + queryId + ".json";
+      this.query = query = query.substring(0,  Math.min(query.length(), 150));
+      this.state = state;
+    }
+
+    public String getQuery(){
+      return query;
+    }
+
+    public String getQueryId() {
+      return queryId;
+    }
+
+    public String getTime() {
+      return format.format(time);
+    }
+
+
+    public String getState() {
+      return state;
+    }
+
+    public String getLocation() {
+      return location;
+    }
+
+    @Override
+    public int compareTo(ProfileInfo other) {
+      return time.compareTo(other.time);
+    }
+
+    public String getForeman() {
+      return foreman;
+    }
+
+  }
+
+  private PStoreProvider provider(){
+    return work.getContext().getPersistentStoreProvider();
+  }
+
+  @XmlRootElement
+  public class QProfiles {
+    private List<ProfileInfo> runningQueries;
+    private List<ProfileInfo> finishedQueries;
+
+    public QProfiles(List<ProfileInfo> runningQueries, List<ProfileInfo> finishedQueries) {
+      this.runningQueries = runningQueries;
+      this.finishedQueries = finishedQueries;
+    }
+
+    public List<ProfileInfo> getRunningQueries() {
+      return runningQueries;
+    }
+
+    public List<ProfileInfo> getFinishedQueries() {
+      return finishedQueries;
+    }
+  }
+
+  @GET
+  @Path("/profiles.json")
+  @Produces(MediaType.APPLICATION_JSON)
+  public QProfiles getProfilesJSON() {
+    PStore<QueryProfile> completed = null;
+    PStore<QueryInfo> running = null;
+    try {
+      completed = provider().getStore(QueryStatus.QUERY_PROFILE);
+      running = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
+    } catch (IOException e) {
+      logger.debug("Failed to get profiles from persistent or ephemeral store.");
+      return new QProfiles(new ArrayList<ProfileInfo>(), new ArrayList<ProfileInfo>());
+    }
+
+    List<ProfileInfo> runningQueries = Lists.newArrayList();
+
+    for (Map.Entry<String, QueryInfo> entry : running) {
+      QueryInfo profile = entry.getValue();
+      runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name()));
+    }
+
+    Collections.sort(runningQueries, Collections.reverseOrder());
+
+
+    List<ProfileInfo> finishedQueries = Lists.newArrayList();
+    for (Map.Entry<String, QueryProfile> entry : completed) {
+      QueryProfile profile = entry.getValue();
+      finishedQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name()));
+    }
+
+    return new QProfiles(runningQueries, finishedQueries);
+  }
+
+  @GET
+  @Path("/profiles")
+  @Produces(MediaType.TEXT_HTML)
+  public Viewable getProfiles() {
+    QProfiles profiles = getProfilesJSON();
+    return new Viewable("/rest/profile/list.ftl", profiles);
+  }
+
+  private QueryProfile getQueryProfile(String queryId) {
+    QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
+
+    // first check local running
+    Foreman f = work.getBee().getForemanForQueryId(id);
+    if(f != null){
+      return f.getQueryStatus().getAsProfile();
+    }
+
+    // then check remote running
+    try{
+      PStore<QueryInfo> runningQueries = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
+      QueryInfo info = runningQueries.get(queryId);
+      return work.getContext().getController().getTunnel(info.getForeman()).requestQueryProfile(id).checkedGet(2, TimeUnit.SECONDS);
+    }catch(Exception e){
+      logger.debug("Failure to find query as running profile.", e);
+    }
+
+    // then check blob store
+    try{
+      PStore<QueryProfile> profiles = provider().getStore(QueryStatus.QUERY_PROFILE);
+      return profiles.get(queryId);
+    }catch(Exception e){
+      logger.warn("Failure to load query profile for query {}", queryId, e);
+    }
+
+    // TODO: Improve error messaging.
+    return QueryProfile.getDefaultInstance();
+
+  }
+
+
+  @GET
+  @Path("/profiles/{queryid}.json")
+  @Produces(MediaType.APPLICATION_JSON)
+  public String getProfileJSON(@PathParam("queryid") String queryId) {
+    try {
+      return new String(QueryStatus.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId)));
+    } catch (IOException e) {
+      logger.debug("Failed to serialize profile for: " + queryId);
+      return ("{ 'message' : 'error (unable to serialize profile)' }");
+    }
+  }
+
+  @GET
+  @Path("/profiles/{queryid}")
+  @Produces(MediaType.TEXT_HTML)
+  public Viewable getProfile(@PathParam("queryid") String queryId) {
+    ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId));
+
+    return new Viewable("/rest/profile/profile.ftl", wrapper);
+
+  }
+
+
+  @GET
+  @Path("/profiles/cancel/{queryid}")
+  @Produces(MediaType.TEXT_PLAIN)
+  public String cancelQuery(@PathParam("queryid") String queryId) throws IOException {
+
+    QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
+
+    // first check local running
+    Foreman f = work.getBee().getForemanForQueryId(id);
+    if(f != null){
+      f.cancel();
+      return String.format("Cancelled query %s on locally running node.", queryId);
+    }
+
+    // then check remote running
+    try{
+      PStore<QueryInfo> runningQueries = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
+      QueryInfo info = runningQueries.get(queryId);
+      Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2, TimeUnit.SECONDS);
+      if(a.getOk()){
+        return String.format("Query %s canceled on node %s.", queryId, info.getForeman().getAddress());
+      }else{
+        return String.format("Attempted to cancel query %s on %s but the query is no longer active on that node.", queryId, info.getForeman().getAddress());
+      }
+    }catch(Exception e){
+      logger.debug("Failure to find query as running profile.", e);
+      return String.format("Failure attempting to cancel query %s.  Unable to find information about where query is actively running.", queryId);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
new file mode 100644
index 0000000..80016aa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class ProfileWrapper {
+
+  public QueryProfile profile;
+  public String id;
+
+  public ProfileWrapper(QueryProfile profile) {
+    this.profile = profile;
+    this.id = QueryIdHelper.getQueryId(profile.getId());
+  }
+
+  public QueryProfile getProfile() {
+    return profile;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public String getQueryId() {
+    return QueryIdHelper.getQueryId(profile.getId());
+  }
+
+  public List<OperatorWrapper> getOperatorProfiles() {
+    List<OperatorWrapper> ows = Lists.newArrayList();
+    Map<ImmutablePair<Integer, Integer>, List<ImmutablePair<OperatorProfile, Integer>>> opmap = Maps.newHashMap();
+
+    List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
+    Collections.sort(majors, Comparators.majorIdCompare);
+    for (MajorFragmentProfile major : majors) {
+
+      List<MinorFragmentProfile> minors = new ArrayList<>(major.getMinorFragmentProfileList());
+      Collections.sort(minors, Comparators.minorIdCompare);
+      for (MinorFragmentProfile minor : minors) {
+
+        List<OperatorProfile> ops = new ArrayList<>(minor.getOperatorProfileList());
+        Collections.sort(ops, Comparators.operatorIdCompare);
+        for (OperatorProfile op : ops) {
+
+          ImmutablePair<Integer, Integer> ip = new ImmutablePair<>(
+              major.getMajorFragmentId(), op.getOperatorId());
+          if (!opmap.containsKey(ip)) {
+            List<ImmutablePair<OperatorProfile, Integer>> l = Lists.newArrayList();
+            opmap.put(ip, l);
+          }
+          opmap.get(ip).add(new ImmutablePair<>(op, minor.getMinorFragmentId()));
+        }
+      }
+    }
+
+    List<ImmutablePair<Integer, Integer>> keys = new ArrayList<>(opmap.keySet());
+    Collections.sort(keys);
+
+    for (ImmutablePair<Integer, Integer> ip : keys) {
+      ows.add(new OperatorWrapper(ip.getLeft(), opmap.get(ip)));
+    }
+
+    return ows;
+  }
+
+  public List<FragmentWrapper> getFragmentProfiles() {
+    List<FragmentWrapper> fws = Lists.newArrayList();
+
+    List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
+    Collections.sort(majors, Comparators.majorIdCompare);
+    for (MajorFragmentProfile major : majors) {
+      fws.add(new FragmentWrapper(major, profile.getStart()));
+    }
+
+    return fws;
+  }
+
+  public String getFragmentsOverview() {
+    final String[] columns = {"Major Fragment", "Minor Fragments Reporting", "First Start", "Last Start", "First End", "Last End", "tmin", "tavg", "tmax", "memmax"};
+    TableBuilder tb = new TableBuilder(columns);
+    for (FragmentWrapper fw : getFragmentProfiles()) {
+      fw.addSummary(tb);
+    }
+    return tb.toString();
+  }
+
+
+
+  public String getOperatorsOverview() {
+    final String [] columns = {"Operator", "Type", "Setup (min)", "Setup (avg)", "Setup (max)", "Process (min)", "Process (avg)", "Process (max)", "Wait (min)", "Wait (avg)", "Wait (max)", "Mem (avg)", "Mem (max)"};
+    TableBuilder tb = new TableBuilder(columns);
+    for (OperatorWrapper ow : getOperatorProfiles()) {
+      ow.addSummary(tb);
+    }
+    return tb.toString();
+  }
+
+  public String getOperatorsJSON() {
+    StringBuilder sb = new StringBuilder("{");
+    String sep = "";
+    for (CoreOperatorType op : CoreOperatorType.values()) {
+      sb.append(String.format("%s\"%d\" : \"%s\"", sep, op.ordinal(), op));
+      sep = ", ";
+    }
+    return sb.append("}").toString();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
new file mode 100644
index 0000000..72f4436
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+
+class TableBuilder {
+  NumberFormat format = NumberFormat.getInstance(Locale.US);
+  SimpleDateFormat hours = new SimpleDateFormat("HH:mm");
+  SimpleDateFormat shours = new SimpleDateFormat("H:mm");
+  SimpleDateFormat mins = new SimpleDateFormat("mm:ss");
+  SimpleDateFormat smins = new SimpleDateFormat("m:ss");
+
+  SimpleDateFormat secs = new SimpleDateFormat("ss.SSS");
+  SimpleDateFormat ssecs = new SimpleDateFormat("s.SSS");
+  DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
+  DecimalFormat dec = new DecimalFormat("0.00");
+  DecimalFormat intformat = new DecimalFormat("#,###");
+
+  StringBuilder sb;
+  int w = 0;
+  int width;
+
+  public TableBuilder(String[] columns) {
+    sb = new StringBuilder();
+    width = columns.length;
+
+    format.setMaximumFractionDigits(3);
+    format.setMinimumFractionDigits(3);
+
+    sb.append("<table class=\"table table-bordered text-right\">\n<tr>");
+    for (String cn : columns) {
+      sb.append("<th>" + cn + "</th>");
+    }
+    sb.append("</tr>\n");
+  }
+
+  public void appendCell(String s, String link) {
+    if (w == 0) {
+      sb.append("<tr>");
+    }
+    sb.append(String.format("<td>%s%s</td>", s, link != null ? link : ""));
+    if (++w >= width) {
+      sb.append("</tr>\n");
+      w = 0;
+    }
+  }
+
+  public void appendRepeated(String s, String link, int n) {
+    for (int i = 0; i < n; i++) {
+      appendCell(s, link);
+    }
+  }
+
+  public void appendTime(long d, String link) {
+    appendCell(dateFormat.format(d), link);
+  }
+
+  public void appendMillis(long p, String link) {
+    double secs = p/1000.0;
+    double mins = secs/60;
+    double hours = mins/60;
+    SimpleDateFormat timeFormat = null;
+    if(hours >= 10){
+      timeFormat = this.hours;
+    }else if(hours >= 1){
+      timeFormat = this.shours;
+    }else if (mins >= 10){
+      timeFormat = this.mins;
+    }else if (mins >= 1){
+      timeFormat = this.smins;
+    }else if (secs >= 10){
+      timeFormat = this.secs;
+    }else {
+      timeFormat = this.ssecs;
+    }
+    appendCell(timeFormat.format(new Date(p)), null);
+  }
+
+  public void appendNanos(long p, String link) {
+    appendMillis((long) (p / 1000.0 / 1000.0), link);
+  }
+
+  public void appendFormattedNumber(Number n, String link) {
+    appendCell(format.format(n), link);
+  }
+
+  public void appendFormattedInteger(long n, String link) {
+    appendCell(intformat.format(n), link);
+  }
+
+  public void appendInteger(long l, String link) {
+    appendCell(Long.toString(l), link);
+  }
+
+  public void appendBytes(long l, String link){
+    appendCell(bytePrint(l), link);
+  }
+
+  private String bytePrint(long size){
+    double m = size/Math.pow(1024, 2);
+    double g = size/Math.pow(1024, 3);
+    double t = size/Math.pow(1024, 4);
+    if (t > 1) {
+      return dec.format(t).concat("TB");
+    } else if (g > 1) {
+      return dec.format(g).concat("GB");
+    } else if (m > 1){
+      return intformat.format(m).concat("MB");
+    } else {
+      return "-";
+    }
+  }
+
+  @Override
+  public String toString() {
+    String rv;
+    rv = sb.append("\n</table>").toString();
+    sb = null;
+    return rv;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index b33042b..378e81a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -486,7 +486,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
       case AWAITING_ALLOCATION:
       case RUNNING:
         if(data.isLocal()){
-          rootRunner.cancel();
+          if(rootRunner != null){
+            rootRunner.cancel();
+          }
         }else{
           bee.getContext().getController().getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle);
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
index 8fd4b97..1b0885d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
@@ -106,7 +106,7 @@ public abstract class AbstractStatusReporter implements StatusReporter{
 
   @Override
   public final void fail(FragmentHandle handle, String message, Throwable excep) {
-    FragmentStatus.Builder status = getBuilder(FragmentState.FAILED, message, excep);
+    FragmentStatus.Builder status = getBuilder(context, FragmentState.FAILED, message, excep);
     fail(handle, status);
   }