You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/02/05 02:30:27 UTC
[2/2] drill git commit: DRILL-2114: Update profile page to provide
query fragment state, node where query was run, and memory information. Also,
extract giant class into smaller units.
DRILL-2114: Update profile page to provide query fragment state, node where query was run, and memory information. Also, extract giant class into smaller units.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a9ee7911
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a9ee7911
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a9ee7911
Branch: refs/heads/master
Commit: a9ee79110450d3a68cb3600552207226594e5b58
Parents: 6331d35
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed Jan 14 07:41:41 2015 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Feb 4 16:28:03 2015 -0800
----------------------------------------------------------------------
.../apache/drill/exec/ops/FragmentContext.java | 6 +-
.../apache/drill/exec/ops/FragmentStats.java | 5 +-
.../drill/exec/server/rest/DrillRestServer.java | 1 +
.../exec/server/rest/ProfileResources.java | 260 ---------
.../drill/exec/server/rest/ProfileWrapper.java | 541 -------------------
.../exec/server/rest/profile/Comparators.java | 93 ++++
.../drill/exec/server/rest/profile/Filters.java | 41 ++
.../server/rest/profile/FragmentWrapper.java | 141 +++++
.../rest/profile/OperatorPathBuilder.java | 86 +++
.../server/rest/profile/OperatorWrapper.java | 117 ++++
.../server/rest/profile/ProfileResources.java | 260 +++++++++
.../server/rest/profile/ProfileWrapper.java | 138 +++++
.../exec/server/rest/profile/TableBuilder.java | 142 +++++
.../apache/drill/exec/work/foreman/Foreman.java | 4 +-
.../work/fragment/AbstractStatusReporter.java | 2 +-
15 files changed, 1031 insertions(+), 806 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index dc47f4e..e413921 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -88,7 +88,7 @@ public class FragmentContext implements Closeable {
public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException {
- this.stats = new FragmentStats(dbContext.getMetrics(), fragment.getAssignment());
+
this.context = dbContext;
this.connection = connection;
this.fragment = fragment;
@@ -109,14 +109,16 @@ public class FragmentContext implements Closeable {
} catch (Exception e) {
throw new ExecutionSetupException("Failure while reading plan options.", e);
}
+
// Add the fragment context to the root allocator.
// The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
try {
- this.allocator = context.getAllocator().getChildAllocator(this, fragment.getMemInitial(), fragment.getMemMax(), true);
+ this.allocator = dbContext.getAllocator().getChildAllocator(this, fragment.getMemInitial(), fragment.getMemMax(), true);
assert (allocator != null);
}catch(Throwable e){
throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
}
+ this.stats = new FragmentStats(allocator, dbContext.getMetrics(), fragment.getAssignment());
this.loader = new QueryClassLoader(dbContext.getConfig(), fragmentOptions);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
index 4431235..b6b0a8b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
@@ -32,14 +32,17 @@ public class FragmentStats {
private List<OperatorStats> operators = Lists.newArrayList();
private final long startTime;
private final DrillbitEndpoint endpoint;
+ private final BufferAllocator allocator;
- public FragmentStats(MetricRegistry metrics, DrillbitEndpoint endpoint) {
+ public FragmentStats(BufferAllocator allocator, MetricRegistry metrics, DrillbitEndpoint endpoint) {
this.startTime = System.currentTimeMillis();
this.endpoint = endpoint;
+ this.allocator = allocator;
}
public void addMetricsToStatus(MinorFragmentProfile.Builder prfB) {
prfB.setStartTime(startTime);
+ prfB.setMaxMemoryUsed(allocator.getPeakMemoryAllocation());
prfB.setEndTime(System.currentTimeMillis());
prfB.setEndpoint(endpoint);
for(OperatorStats o : operators){
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index e8cb4ba..147e4a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.server.rest;
+import org.apache.drill.exec.server.rest.profile.ProfileResources;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.WorkManager;
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
deleted file mode 100644
index 58b3d4e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.server.rest;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.inject.Inject;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
-import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.store.sys.EStore;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.work.WorkManager;
-import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.foreman.QueryStatus;
-import org.glassfish.jersey.server.mvc.Viewable;
-
-import com.google.common.collect.Lists;
-
-@Path("/")
-public class ProfileResources {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileResources.class);
-
- @Inject WorkManager work;
-
- public static class ProfileInfo implements Comparable<ProfileInfo> {
- public static final SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
-
- private String queryId;
- private Date time;
- private String location;
- private String foreman;
- private String query;
- private String state;
-
- public ProfileInfo(String queryId, long time, String foreman, String query, String state) {
- this.queryId = queryId;
- this.time = new Date(time);
- this.foreman = foreman;
- this.location = "http://localhost:8047/profile/" + queryId + ".json";
- this.query = query = query.substring(0, Math.min(query.length(), 150));
- this.state = state;
- }
-
- public String getQuery(){
- return query;
- }
-
- public String getQueryId() {
- return queryId;
- }
-
- public String getTime() {
- return format.format(time);
- }
-
-
- public String getState() {
- return state;
- }
-
- public String getLocation() {
- return location;
- }
-
- @Override
- public int compareTo(ProfileInfo other) {
- return time.compareTo(other.time);
- }
-
- public String getForeman() {
- return foreman;
- }
-
- }
-
- private PStoreProvider provider(){
- return work.getContext().getPersistentStoreProvider();
- }
-
- @XmlRootElement
- public class QProfiles {
- private List<ProfileInfo> runningQueries;
- private List<ProfileInfo> finishedQueries;
-
- public QProfiles(List<ProfileInfo> runningQueries, List<ProfileInfo> finishedQueries) {
- this.runningQueries = runningQueries;
- this.finishedQueries = finishedQueries;
- }
-
- public List<ProfileInfo> getRunningQueries() {
- return runningQueries;
- }
-
- public List<ProfileInfo> getFinishedQueries() {
- return finishedQueries;
- }
- }
-
- @GET
- @Path("/profiles.json")
- @Produces(MediaType.APPLICATION_JSON)
- public QProfiles getProfilesJSON() {
- PStore<QueryProfile> completed = null;
- PStore<QueryInfo> running = null;
- try {
- completed = provider().getStore(QueryStatus.QUERY_PROFILE);
- running = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
- } catch (IOException e) {
- logger.debug("Failed to get profiles from persistent or ephemeral store.");
- return new QProfiles(new ArrayList<ProfileInfo>(), new ArrayList<ProfileInfo>());
- }
-
- List<ProfileInfo> runningQueries = Lists.newArrayList();
-
- for (Map.Entry<String, QueryInfo> entry : running) {
- QueryInfo profile = entry.getValue();
- runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name()));
- }
-
- Collections.sort(runningQueries, Collections.reverseOrder());
-
-
- List<ProfileInfo> finishedQueries = Lists.newArrayList();
- for (Map.Entry<String, QueryProfile> entry : completed) {
- QueryProfile profile = entry.getValue();
- finishedQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name()));
- }
-
- return new QProfiles(runningQueries, finishedQueries);
- }
-
- @GET
- @Path("/profiles")
- @Produces(MediaType.TEXT_HTML)
- public Viewable getProfiles() {
- QProfiles profiles = getProfilesJSON();
- return new Viewable("/rest/profile/list.ftl", profiles);
- }
-
- private QueryProfile getQueryProfile(String queryId) {
- QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
-
- // first check local running
- Foreman f = work.getBee().getForemanForQueryId(id);
- if(f != null){
- return f.getQueryStatus().getAsProfile();
- }
-
- // then check remote running
- try{
- PStore<QueryInfo> runningQueries = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
- QueryInfo info = runningQueries.get(queryId);
- return work.getContext().getController().getTunnel(info.getForeman()).requestQueryProfile(id).checkedGet(2, TimeUnit.SECONDS);
- }catch(Exception e){
- logger.debug("Failure to find query as running profile.", e);
- }
-
- // then check blob store
- try{
- PStore<QueryProfile> profiles = provider().getStore(QueryStatus.QUERY_PROFILE);
- return profiles.get(queryId);
- }catch(Exception e){
- logger.warn("Failure to load query profile for query {}", queryId, e);
- }
-
- // TODO: Improve error messaging.
- return QueryProfile.getDefaultInstance();
-
- }
-
-
- @GET
- @Path("/profiles/{queryid}.json")
- @Produces(MediaType.APPLICATION_JSON)
- public String getProfileJSON(@PathParam("queryid") String queryId) {
- try {
- return new String(QueryStatus.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId)));
- } catch (IOException e) {
- logger.debug("Failed to serialize profile for: " + queryId);
- return ("{ 'message' : 'error (unable to serialize profile)' }");
- }
- }
-
- @GET
- @Path("/profiles/{queryid}")
- @Produces(MediaType.TEXT_HTML)
- public Viewable getProfile(@PathParam("queryid") String queryId) {
- ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId));
-
- return new Viewable("/rest/profile/profile.ftl", wrapper);
-
- }
-
-
- @GET
- @Path("/profiles/cancel/{queryid}")
- @Produces(MediaType.TEXT_PLAIN)
- public String cancelQuery(@PathParam("queryid") String queryId) throws IOException {
-
- QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
-
- // first check local running
- Foreman f = work.getBee().getForemanForQueryId(id);
- if(f != null){
- f.cancel();
- return String.format("Cancelled query %s on locally running node.", queryId);
- }
-
- // then check remote running
- try{
- PStore<QueryInfo> runningQueries = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
- QueryInfo info = runningQueries.get(queryId);
- Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2, TimeUnit.SECONDS);
- if(a.getOk()){
- return String.format("Query %s canceled on node %s.", queryId, info.getForeman().getAddress());
- }else{
- return String.format("Attempted to cancel query %s on %s but the query is no longer active on that node.", queryId, info.getForeman().getAddress());
- }
- }catch(Exception e){
- logger.debug("Failure to find query as running profile.", e);
- return String.format("Failure attempting to cancel query %s. Unable to find information about where query is actively running.", queryId);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
deleted file mode 100644
index 80c08d3..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
+++ /dev/null
@@ -1,541 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.server.rest;
-
-import java.text.DateFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
-import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
-import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
-import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class ProfileWrapper {
-
- public QueryProfile profile;
- public String id;
-
- public ProfileWrapper(QueryProfile profile) {
- this.profile = profile;
- this.id = QueryIdHelper.getQueryId(profile.getId());
- }
-
- public QueryProfile getProfile() {
- return profile;
- }
-
- public String getId() {
- return id;
- }
-
- public String getQueryId() {
- return QueryIdHelper.getQueryId(profile.getId());
- }
-
- public List<OperatorWrapper> getOperatorProfiles() {
- List<OperatorWrapper> ows = Lists.newArrayList();
- Map<ImmutablePair<Integer, Integer>, List<ImmutablePair<OperatorProfile, Integer>>> opmap = Maps.newHashMap();
-
- List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
- Collections.sort(majors, Comparators.majorIdCompare);
- for (MajorFragmentProfile major : majors) {
-
- List<MinorFragmentProfile> minors = new ArrayList<>(major.getMinorFragmentProfileList());
- Collections.sort(minors, Comparators.minorIdCompare);
- for (MinorFragmentProfile minor : minors) {
-
- List<OperatorProfile> ops = new ArrayList<>(minor.getOperatorProfileList());
- Collections.sort(ops, Comparators.operatorIdCompare);
- for (OperatorProfile op : ops) {
-
- ImmutablePair<Integer, Integer> ip = new ImmutablePair<>(
- major.getMajorFragmentId(), op.getOperatorId());
- if (!opmap.containsKey(ip)) {
- List<ImmutablePair<OperatorProfile, Integer>> l = Lists.newArrayList();
- opmap.put(ip, l);
- }
- opmap.get(ip).add(new ImmutablePair<>(op, minor.getMinorFragmentId()));
- }
- }
- }
-
- List<ImmutablePair<Integer, Integer>> keys = new ArrayList<>(opmap.keySet());
- Collections.sort(keys);
- ImmutablePair<OperatorProfile, Integer> val;
- for (ImmutablePair<Integer, Integer> ip : keys) {
- ows.add(new OperatorWrapper(ip.getLeft(), opmap.get(ip)));
- }
-
- return ows;
- }
-
- public List<FragmentWrapper> getFragmentProfiles() {
- List<FragmentWrapper> fws = Lists.newArrayList();
-
- List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
- Collections.sort(majors, Comparators.majorIdCompare);
- for (MajorFragmentProfile major : majors) {
- fws.add(new FragmentWrapper(major));
- }
-
- return fws;
- }
-
- public String getFragmentsOverview() {
- final String[] columns = {"Major Fragment", "Minor Fragments Reporting", "First Start", "Last Start", "First End", "Last End", "tmin", "tavg", "tmax"};
- TableBuilder tb = new TableBuilder(columns);
- for (FragmentWrapper fw : getFragmentProfiles()) {
- fw.addSummary(tb);
- }
- return tb.toString();
- }
-
- public String majorFragmentTimingProfile(MajorFragmentProfile major) {
- final String[] columns = {"Minor Fragment", "Start", "End", "Total Time", "Max Records", "Max Batches"};
- TableBuilder builder = new TableBuilder(columns);
-
- ArrayList<MinorFragmentProfile> complete, incomplete;
- complete = new ArrayList<MinorFragmentProfile>(Collections2.filter(
- major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
- incomplete = new ArrayList<MinorFragmentProfile>(Collections2.filter(
- major.getMinorFragmentProfileList(), Filters.missingOperatorsOrTimes));
-
- Collections.sort(complete, Comparators.minorIdCompare);
- for (MinorFragmentProfile minor : complete) {
- ArrayList<OperatorProfile> ops = new ArrayList<OperatorProfile>(minor.getOperatorProfileList());
-
- long t0 = profile.getStart();
- long biggestIncomingRecords = 0;
- long biggestBatches = 0;
-
- for (OperatorProfile op : ops) {
- long incomingRecords = 0;
- long batches = 0;
- for (StreamProfile sp : op.getInputProfileList()) {
- incomingRecords += sp.getRecords();
- batches += sp.getBatches();
- }
- biggestIncomingRecords = Math.max(biggestIncomingRecords, incomingRecords);
- biggestBatches = Math.max(biggestBatches, batches);
- }
-
- builder.appendCell(new OperatorPathBuilder().setMajor(major).setMinor(minor).build(), null);
- builder.appendMillis(minor.getStartTime() - t0, null);
- builder.appendMillis(minor.getEndTime() - t0, null);
- builder.appendMillis(minor.getEndTime() - minor.getStartTime(), null);
-
- builder.appendInteger(biggestIncomingRecords, null);
- builder.appendInteger(biggestBatches, null);
- }
- for (MinorFragmentProfile m : incomplete) {
- builder.appendCell(
- major.getMajorFragmentId() + "-"
- + m.getMinorFragmentId(), null);
- builder.appendRepeated(m.getState().toString(), null, 5);
- }
- return builder.toString();
- }
-
- public String getOperatorsOverview() {
- final String [] columns = {"Operator", "Type", "Setup (min)", "Setup (avg)", "Setup (max)", "Process (min)", "Process (avg)", "Process (max)", "Wait (min)", "Wait (avg)", "Wait (max)"};
- TableBuilder tb = new TableBuilder(columns);
- for (OperatorWrapper ow : getOperatorProfiles()) {
- ow.addSummary(tb);
- }
- return tb.toString();
- }
-
- public String getOperatorsJSON() {
- StringBuilder sb = new StringBuilder("{");
- String sep = "";
- for (CoreOperatorType op : CoreOperatorType.values()) {
- sb.append(String.format("%s\"%d\" : \"%s\"", sep, op.ordinal(), op));
- sep = ", ";
- }
- return sb.append("}").toString();
- }
-
- private static class OperatorPathBuilder {
- private static final String OPERATOR_PATH_PATTERN = "%s-%s-%s";
- private static final String DEFAULT = "xx";
- private String major;
- private String minor;
- private String operator;
-
- public OperatorPathBuilder() {
- clear();
- }
-
- public void clear() {
- major = DEFAULT;
- minor = DEFAULT;
- operator = DEFAULT;
- }
-
- // Utility to left pad strings
- protected String leftPad(String text) {
- return String.format("00%s", text).substring(text.length());
- }
-
- public OperatorPathBuilder setMajor(MajorFragmentProfile major) {
- if (major!=null) {
- return setMajor(major.getMajorFragmentId());
- }
- return this;
- }
-
- public OperatorPathBuilder setMajor(int newMajor) {
- major = leftPad(String.valueOf(newMajor));
- return this;
- }
-
- public OperatorPathBuilder setMinor(MinorFragmentProfile minor) {
- if (minor!=null) {
- return setMinor(minor.getMinorFragmentId());
- }
- return this;
- }
-
- public OperatorPathBuilder setMinor(int newMinor) {
- minor = leftPad(String.valueOf(newMinor));
- return this;
- }
-
- public OperatorPathBuilder setOperator(OperatorProfile op) {
- if (op!=null) {
- return setOperator(op.getOperatorId());
- }
- return this;
- }
-
- public OperatorPathBuilder setOperator(int newOp) {
- operator = leftPad(String.valueOf(newOp));
- return this;
- }
-
- public String build() {
- StringBuffer sb = new StringBuffer();
- return sb.append(major).append("-")
- .append(minor).append("-")
- .append(operator)
- .toString();
- }
- }
-
- public class FragmentWrapper {
- private final MajorFragmentProfile major;
-
- public FragmentWrapper(MajorFragmentProfile major) {
- this.major = Preconditions.checkNotNull(major);
- }
-
- public String getDisplayName() {
- return String.format("Major Fragment: %s", new OperatorPathBuilder().setMajor(major).build());
- }
-
- public String getId() {
- return String.format("fragment-%s", major.getMajorFragmentId());
- }
-
- public void addSummary(TableBuilder tb) {
- final String fmt = " (%d)";
- long t0 = profile.getStart();
-
- ArrayList<MinorFragmentProfile> complete = new ArrayList<MinorFragmentProfile>(
- Collections2.filter(major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
-
- tb.appendCell(new OperatorPathBuilder().setMajor(major).build(), null);
- tb.appendCell(complete.size() + " / " + major.getMinorFragmentProfileCount(), null);
-
- if (complete.size() < 1) {
- tb.appendRepeated("", null, 7);
- return;
- }
-
- int li = complete.size() - 1;
-
- Collections.sort(complete, Comparators.startTimeCompare);
- tb.appendMillis(complete.get(0).getStartTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
- tb.appendMillis(complete.get(li).getStartTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
-
- Collections.sort(complete, Comparators.endTimeCompare);
- tb.appendMillis(complete.get(0).getEndTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
- tb.appendMillis(complete.get(li).getEndTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
-
- long total = 0;
- for (MinorFragmentProfile p : complete) {
- total += p.getEndTime() - p.getStartTime();
- }
- Collections.sort(complete, Comparators.runTimeCompare);
- tb.appendMillis(complete.get(0).getEndTime() - complete.get(0).getStartTime(),
- String.format(fmt, complete.get(0).getMinorFragmentId()));
- tb.appendMillis((long) (total / complete.size()), null);
- tb.appendMillis(complete.get(li).getEndTime() - complete.get(li).getStartTime(),
- String.format(fmt, complete.get(li).getMinorFragmentId()));
- }
-
- public String getContent() {
- return majorFragmentTimingProfile(major);
- }
- }
-
- public class OperatorWrapper {
- private final int major;
- private List<ImmutablePair<OperatorProfile, Integer>> ops;
-
- public OperatorWrapper(int major, List<ImmutablePair<OperatorProfile, Integer>> ops) {
- assert ops.size() > 0;
- this.major = major;
- this.ops = ops;
- }
-
- public String getDisplayName() {
- OperatorProfile op = ops.get(0).getLeft();
- String path = new OperatorPathBuilder().setMajor(major).setOperator(op).build();
- CoreOperatorType operatorType = CoreOperatorType.valueOf(op.getOperatorType());
- return String.format("%s - %s", path, operatorType == null ? "UKNOWN_OPERATOR" : operatorType.toString());
- }
-
- public String getId() {
- return String.format("operator-%d-%d", major, ops.get(0).getLeft().getOperatorId());
- }
-
- public String getContent() {
- final String [] columns = {"Minor Fragment", "Setup", "Process", "Wait", "Max Batches", "Max Records"};
- TableBuilder builder = new TableBuilder(columns);
-
- for (ImmutablePair<OperatorProfile, Integer> ip : ops) {
- int minor = ip.getRight();
- OperatorProfile op = ip.getLeft();
-
- String path = new OperatorPathBuilder().setMajor(major).setMinor(minor).setOperator(op).build();
- builder.appendCell(path, null);
- builder.appendNanos(op.getSetupNanos(), null);
- builder.appendNanos(op.getProcessNanos(), null);
- builder.appendNanos(op.getWaitNanos(), null);
-
- long maxBatches = Long.MIN_VALUE;
- long maxRecords = Long.MIN_VALUE;
- for (StreamProfile sp : op.getInputProfileList()) {
- maxBatches = Math.max(sp.getBatches(), maxBatches);
- maxRecords = Math.max(sp.getRecords(), maxRecords);
- }
-
- builder.appendInteger(maxBatches, null);
- builder.appendInteger(maxRecords, null);
- }
- return builder.toString();
- }
-
- public void addSummary(TableBuilder tb) {
- OperatorProfile op = ops.get(0).getLeft();
- String path = new OperatorPathBuilder().setMajor(major).setOperator(op).build();
- tb.appendCell(path, null);
- CoreOperatorType operatorType = CoreOperatorType.valueOf(ops.get(0).getLeft().getOperatorType());
- tb.appendCell(operatorType == null ? "UNKNOWN_OPERATOR" : operatorType.toString(), null);
-
- int li = ops.size() - 1;
- String fmt = " (%s)";
-
- double setupSum = 0.0;
- double processSum = 0.0;
- double waitSum = 0.0;
- for (ImmutablePair<OperatorProfile, Integer> ip : ops) {
- setupSum += ip.getLeft().getSetupNanos();
- processSum += ip.getLeft().getProcessNanos();
- waitSum += ip.getLeft().getWaitNanos();
- }
-
- Collections.sort(ops, Comparators.setupTimeSort);
- tb.appendNanos(ops.get(0).getLeft().getSetupNanos(), String.format(fmt, ops.get(0).getRight()));
- tb.appendNanos((long) (setupSum / ops.size()), null);
- tb.appendNanos(ops.get(li).getLeft().getSetupNanos(), String.format(fmt, ops.get(li).getRight()));
-
- Collections.sort(ops, Comparators.processTimeSort);
- tb.appendNanos(ops.get(0).getLeft().getProcessNanos(), String.format(fmt, ops.get(0).getRight()));
- tb.appendNanos((long) (processSum / ops.size()), null);
- tb.appendNanos(ops.get(li).getLeft().getProcessNanos(), String.format(fmt, ops.get(li).getRight()));
-
- Collections.sort(ops, Comparators.waitTimeSort);
- tb.appendNanos(ops.get(0).getLeft().getWaitNanos(), String.format(fmt, ops.get(0).getRight()));
- tb.appendNanos((long) (waitSum / ops.size()), null);
- tb.appendNanos(ops.get(li).getLeft().getWaitNanos(), String.format(fmt, ops.get(li).getRight()));
- }
- }
-
- static class Comparators {
- final static Comparator<MajorFragmentProfile> majorIdCompare = new Comparator<MajorFragmentProfile>() {
- public int compare(MajorFragmentProfile o1, MajorFragmentProfile o2) {
- return Long.compare(o1.getMajorFragmentId(), o2.getMajorFragmentId());
- }
- };
-
- final static Comparator<MinorFragmentProfile> minorIdCompare = new Comparator<MinorFragmentProfile>() {
- public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
- return Long.compare(o1.getMinorFragmentId(), o2.getMinorFragmentId());
- }
- };
-
- final static Comparator<MinorFragmentProfile> startTimeCompare = new Comparator<MinorFragmentProfile>() {
- public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
- return Long.compare(o1.getStartTime(), o2.getStartTime());
- }
- };
-
- final static Comparator<MinorFragmentProfile> endTimeCompare = new Comparator<MinorFragmentProfile>() {
- public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
- return Long.compare(o1.getEndTime(), o2.getEndTime());
- }
- };
-
- final static Comparator<MinorFragmentProfile> runTimeCompare = new Comparator<MinorFragmentProfile>() {
- public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
- return Long.compare(o1.getEndTime() - o1.getStartTime(), o2.getEndTime() - o2.getStartTime());
- }
- };
-
- final static Comparator<OperatorProfile> operatorIdCompare = new Comparator<OperatorProfile>() {
- public int compare(OperatorProfile o1, OperatorProfile o2) {
- return Long.compare(o1.getOperatorId(), o2.getOperatorId());
- }
- };
-
- final static Comparator<Pair<OperatorProfile, Integer>> setupTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
- public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
- return Long.compare(o1.getLeft().getSetupNanos(), o2.getLeft().getSetupNanos());
- }
- };
-
- final static Comparator<Pair<OperatorProfile, Integer>> processTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
- public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
- return Long.compare(o1.getLeft().getProcessNanos(), o2.getLeft().getProcessNanos());
- }
- };
-
- final static Comparator<Pair<OperatorProfile, Integer>> waitTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
- public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
- return Long.compare(o1.getLeft().getWaitNanos(), o2.getLeft().getWaitNanos());
- }
- };
- }
-
- private static class Filters {
- final static Predicate<MinorFragmentProfile> hasOperators = new Predicate<MinorFragmentProfile>() {
- public boolean apply(MinorFragmentProfile arg0) {
- return arg0.getOperatorProfileCount() != 0;
- }
- };
-
- final static Predicate<MinorFragmentProfile> hasTimes = new Predicate<MinorFragmentProfile>() {
- public boolean apply(MinorFragmentProfile arg0) {
- return arg0.hasStartTime() && arg0.hasEndTime();
- }
- };
-
- final static Predicate<MinorFragmentProfile> hasOperatorsAndTimes = Predicates.and(Filters.hasOperators, Filters.hasTimes);
-
- final static Predicate<MinorFragmentProfile> missingOperatorsOrTimes = Predicates.not(hasOperatorsAndTimes);
- }
-
- class TableBuilder {
- NumberFormat format = NumberFormat.getInstance(Locale.US);
- DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
-
- StringBuilder sb;
- int w = 0;
- int width;
-
- public TableBuilder(String[] columns) {
- sb = new StringBuilder();
- width = columns.length;
-
- format.setMaximumFractionDigits(3);
- format.setMinimumFractionDigits(3);
-
- sb.append("<table class=\"table table-bordered text-right\">\n<tr>");
- for (String cn : columns) {
- sb.append("<th>" + cn + "</th>");
- }
- sb.append("</tr>\n");
- }
-
- public void appendCell(String s, String link) {
- if (w == 0) {
- sb.append("<tr>");
- }
- sb.append(String.format("<td>%s%s</td>", s, link != null ? link : ""));
- if (++w >= width) {
- sb.append("</tr>\n");
- w = 0;
- }
- }
-
- public void appendRepeated(String s, String link, int n) {
- for (int i = 0; i < n; i++) {
- appendCell(s, link);
- }
- }
-
- public void appendTime(long d, String link) {
- appendCell(dateFormat.format(d), link);
- }
-
- public void appendMillis(long p, String link) {
- appendCell(format.format(p / 1000.0), link);
- }
-
- public void appendNanos(long p, String link) {
- appendMillis((long) (p / 1000.0 / 1000.0), link);
- }
-
- public void appendFormattedNumber(Number n, String link) {
- appendCell(format.format(n), link);
- }
-
- public void appendInteger(long l, String link) {
- appendCell(Long.toString(l), link);
- }
-
- @Override
- public String toString() {
- String rv;
- rv = sb.append("\n</table>").toString();
- sb = null;
- return rv;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
new file mode 100644
index 0000000..e9024ff
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import java.util.Comparator;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+
+interface Comparators {
+ final static Comparator<MajorFragmentProfile> majorIdCompare = new Comparator<MajorFragmentProfile>() {
+ public int compare(MajorFragmentProfile o1, MajorFragmentProfile o2) {
+ return Long.compare(o1.getMajorFragmentId(), o2.getMajorFragmentId());
+ }
+ };
+
+ final static Comparator<MinorFragmentProfile> minorIdCompare = new Comparator<MinorFragmentProfile>() {
+ public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+ return Long.compare(o1.getMinorFragmentId(), o2.getMinorFragmentId());
+ }
+ };
+
+ final static Comparator<MinorFragmentProfile> startTimeCompare = new Comparator<MinorFragmentProfile>() {
+ public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+ return Long.compare(o1.getStartTime(), o2.getStartTime());
+ }
+ };
+
+ final static Comparator<MinorFragmentProfile> endTimeCompare = new Comparator<MinorFragmentProfile>() {
+ public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+ return Long.compare(o1.getEndTime(), o2.getEndTime());
+ }
+ };
+
+ final static Comparator<MinorFragmentProfile> fragPeakMemAllocated = new Comparator<MinorFragmentProfile>() {
+ public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+ return Long.compare(o1.getMaxMemoryUsed(), o2.getMaxMemoryUsed());
+ }
+ };
+
+ final static Comparator<MinorFragmentProfile> runTimeCompare = new Comparator<MinorFragmentProfile>() {
+ public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+ return Long.compare(o1.getEndTime() - o1.getStartTime(), o2.getEndTime() - o2.getStartTime());
+ }
+ };
+
+ final static Comparator<OperatorProfile> operatorIdCompare = new Comparator<OperatorProfile>() {
+ public int compare(OperatorProfile o1, OperatorProfile o2) {
+ return Long.compare(o1.getOperatorId(), o2.getOperatorId());
+ }
+ };
+
+ final static Comparator<Pair<OperatorProfile, Integer>> setupTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
+ public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+ return Long.compare(o1.getLeft().getSetupNanos(), o2.getLeft().getSetupNanos());
+ }
+ };
+
+ final static Comparator<Pair<OperatorProfile, Integer>> processTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
+ public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+ return Long.compare(o1.getLeft().getProcessNanos(), o2.getLeft().getProcessNanos());
+ }
+ };
+
+ final static Comparator<Pair<OperatorProfile, Integer>> waitTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
+ public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+ return Long.compare(o1.getLeft().getWaitNanos(), o2.getLeft().getWaitNanos());
+ }
+ };
+
+ final static Comparator<Pair<OperatorProfile, Integer>> opPeakMem = new Comparator<Pair<OperatorProfile, Integer>>() {
+ public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+ return Long.compare(o1.getLeft().getPeakLocalMemoryAllocated(), o2.getLeft().getPeakLocalMemoryAllocated());
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Filters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Filters.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Filters.java
new file mode 100644
index 0000000..7dbe947
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Filters.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
+interface Filters {
+ final static Predicate<MinorFragmentProfile> hasOperators = new Predicate<MinorFragmentProfile>() {
+ public boolean apply(MinorFragmentProfile arg0) {
+ return arg0.getOperatorProfileCount() != 0;
+ }
+ };
+
+ final static Predicate<MinorFragmentProfile> hasTimes = new Predicate<MinorFragmentProfile>() {
+ public boolean apply(MinorFragmentProfile arg0) {
+ return arg0.hasStartTime() && arg0.hasEndTime();
+ }
+ };
+
+ final static Predicate<MinorFragmentProfile> hasOperatorsAndTimes = Predicates.and(Filters.hasOperators, Filters.hasTimes);
+
+ final static Predicate<MinorFragmentProfile> missingOperatorsOrTimes = Predicates.not(hasOperatorsAndTimes);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
new file mode 100644
index 0000000..3a66327
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+
+public class FragmentWrapper {
+ private final MajorFragmentProfile major;
+ private final long start;
+
+ public FragmentWrapper(MajorFragmentProfile major, long start) {
+ this.major = Preconditions.checkNotNull(major);
+ this.start = start;
+ }
+
+ public String getDisplayName() {
+ return String.format("Major Fragment: %s", new OperatorPathBuilder().setMajor(major).build());
+ }
+
+ public String getId() {
+ return String.format("fragment-%s", major.getMajorFragmentId());
+ }
+
+ public void addSummary(TableBuilder tb) {
+ final String fmt = " (%d)";
+ long t0 = start;
+
+ ArrayList<MinorFragmentProfile> complete = new ArrayList<MinorFragmentProfile>(
+ Collections2.filter(major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
+
+ tb.appendCell(new OperatorPathBuilder().setMajor(major).build(), null);
+ tb.appendCell(complete.size() + " / " + major.getMinorFragmentProfileCount(), null);
+
+ if (complete.size() < 1) {
+ tb.appendRepeated("", null, 7);
+ return;
+ }
+
+ int li = complete.size() - 1;
+
+ Collections.sort(complete, Comparators.startTimeCompare);
+ tb.appendMillis(complete.get(0).getStartTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
+ tb.appendMillis(complete.get(li).getStartTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
+
+ Collections.sort(complete, Comparators.endTimeCompare);
+ tb.appendMillis(complete.get(0).getEndTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
+ tb.appendMillis(complete.get(li).getEndTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
+
+ long total = 0;
+ for (MinorFragmentProfile p : complete) {
+ total += p.getEndTime() - p.getStartTime();
+ }
+ Collections.sort(complete, Comparators.runTimeCompare);
+ tb.appendMillis(complete.get(0).getEndTime() - complete.get(0).getStartTime(),
+ String.format(fmt, complete.get(0).getMinorFragmentId()));
+ tb.appendMillis((long) (total / complete.size()), null);
+ tb.appendMillis(complete.get(li).getEndTime() - complete.get(li).getStartTime(),
+ String.format(fmt, complete.get(li).getMinorFragmentId()));
+
+ Collections.sort(complete, Comparators.fragPeakMemAllocated);
+ tb.appendBytes(complete.get(li).getMaxMemoryUsed(), null);
+ }
+
+ public String getContent() {
+ return majorFragmentTimingProfile(major);
+ }
+
+
+ public String majorFragmentTimingProfile(MajorFragmentProfile major) {
+ final String[] columns = {"Minor Fragment", "Host", "Start", "End", "Total Time", "Max Records", "Max Batches", "Peak Memory", "State"};
+ TableBuilder builder = new TableBuilder(columns);
+
+ ArrayList<MinorFragmentProfile> complete, incomplete;
+ complete = new ArrayList<MinorFragmentProfile>(Collections2.filter(
+ major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
+ incomplete = new ArrayList<MinorFragmentProfile>(Collections2.filter(
+ major.getMinorFragmentProfileList(), Filters.missingOperatorsOrTimes));
+
+ Collections.sort(complete, Comparators.minorIdCompare);
+ for (MinorFragmentProfile minor : complete) {
+ ArrayList<OperatorProfile> ops = new ArrayList<OperatorProfile>(minor.getOperatorProfileList());
+
+ long t0 = start;
+ long biggestIncomingRecords = 0;
+ long biggestBatches = 0;
+
+ for (OperatorProfile op : ops) {
+ long incomingRecords = 0;
+ long batches = 0;
+ for (StreamProfile sp : op.getInputProfileList()) {
+ incomingRecords += sp.getRecords();
+ batches += sp.getBatches();
+ }
+ biggestIncomingRecords = Math.max(biggestIncomingRecords, incomingRecords);
+ biggestBatches = Math.max(biggestBatches, batches);
+ }
+
+ builder.appendCell(new OperatorPathBuilder().setMajor(major).setMinor(minor).build(), null);
+ builder.appendCell(minor.getEndpoint().getAddress(), null);
+ builder.appendMillis(minor.getStartTime() - t0, null);
+ builder.appendMillis(minor.getEndTime() - t0, null);
+ builder.appendMillis(minor.getEndTime() - minor.getStartTime(), null);
+
+ builder.appendFormattedInteger(biggestIncomingRecords, null);
+ builder.appendFormattedInteger(biggestBatches, null);
+ builder.appendBytes(minor.getMaxMemoryUsed(), null);
+ builder.appendCell(minor.getState().name(), null);
+ }
+ for (MinorFragmentProfile m : incomplete) {
+ builder.appendCell(
+ major.getMajorFragmentId() + "-"
+ + m.getMinorFragmentId(), null);
+ builder.appendRepeated(m.getState().toString(), null, 6);
+ }
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorPathBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorPathBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorPathBuilder.java
new file mode 100644
index 0000000..4cf378f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorPathBuilder.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+
+public class OperatorPathBuilder {
+ private static final String OPERATOR_PATH_PATTERN = "%s-%s-%s";
+ private static final String DEFAULT = "xx";
+ private String major;
+ private String minor;
+ private String operator;
+
+ public OperatorPathBuilder() {
+ clear();
+ }
+
+ public void clear() {
+ major = DEFAULT;
+ minor = DEFAULT;
+ operator = DEFAULT;
+ }
+
+ // Utility to left pad strings
+ protected String leftPad(String text) {
+ return String.format("00%s", text).substring(text.length());
+ }
+
+ public OperatorPathBuilder setMajor(MajorFragmentProfile major) {
+ if (major != null) {
+ return setMajor(major.getMajorFragmentId());
+ }
+ return this;
+ }
+
+ public OperatorPathBuilder setMajor(int newMajor) {
+ major = leftPad(String.valueOf(newMajor));
+ return this;
+ }
+
+ public OperatorPathBuilder setMinor(MinorFragmentProfile minor) {
+ if (minor != null) {
+ return setMinor(minor.getMinorFragmentId());
+ }
+ return this;
+ }
+
+ public OperatorPathBuilder setMinor(int newMinor) {
+ minor = leftPad(String.valueOf(newMinor));
+ return this;
+ }
+
+ public OperatorPathBuilder setOperator(OperatorProfile op) {
+ if (op != null) {
+ return setOperator(op.getOperatorId());
+ }
+ return this;
+ }
+
+ public OperatorPathBuilder setOperator(int newOp) {
+ operator = leftPad(String.valueOf(newOp));
+ return this;
+ }
+
+ public String build() {
+ StringBuffer sb = new StringBuffer();
+ return sb.append(major).append("-").append(minor).append("-").append(operator).toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
new file mode 100644
index 0000000..4f4fcdb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
+
+public class OperatorWrapper {
+ private final int major;
+ private List<ImmutablePair<OperatorProfile, Integer>> ops;
+
+ public OperatorWrapper(int major, List<ImmutablePair<OperatorProfile, Integer>> ops) {
+ assert ops.size() > 0;
+ this.major = major;
+ this.ops = ops;
+ }
+
+ public String getDisplayName() {
+ OperatorProfile op = ops.get(0).getLeft();
+ String path = new OperatorPathBuilder().setMajor(major).setOperator(op).build();
+ CoreOperatorType operatorType = CoreOperatorType.valueOf(op.getOperatorType());
+ return String.format("%s - %s", path, operatorType == null ? "UKNOWN_OPERATOR" : operatorType.toString());
+ }
+
+ public String getId() {
+ return String.format("operator-%d-%d", major, ops.get(0).getLeft().getOperatorId());
+ }
+
+ public String getContent() {
+ final String [] columns = {"Minor Fragment", "Setup", "Process", "Wait", "Max Batches", "Max Records", "Peak Mem"};
+ TableBuilder builder = new TableBuilder(columns);
+
+ for (ImmutablePair<OperatorProfile, Integer> ip : ops) {
+ int minor = ip.getRight();
+ OperatorProfile op = ip.getLeft();
+
+ String path = new OperatorPathBuilder().setMajor(major).setMinor(minor).setOperator(op).build();
+ builder.appendCell(path, null);
+ builder.appendNanos(op.getSetupNanos(), null);
+ builder.appendNanos(op.getProcessNanos(), null);
+ builder.appendNanos(op.getWaitNanos(), null);
+
+ long maxBatches = Long.MIN_VALUE;
+ long maxRecords = Long.MIN_VALUE;
+ for (StreamProfile sp : op.getInputProfileList()) {
+ maxBatches = Math.max(sp.getBatches(), maxBatches);
+ maxRecords = Math.max(sp.getRecords(), maxRecords);
+ }
+
+ builder.appendFormattedInteger(maxBatches, null);
+ builder.appendFormattedInteger(maxRecords, null);
+ builder.appendBytes(op.getPeakLocalMemoryAllocated(), null);
+ }
+ return builder.toString();
+ }
+
+ public void addSummary(TableBuilder tb) {
+ OperatorProfile op = ops.get(0).getLeft();
+ String path = new OperatorPathBuilder().setMajor(major).setOperator(op).build();
+ tb.appendCell(path, null);
+ CoreOperatorType operatorType = CoreOperatorType.valueOf(ops.get(0).getLeft().getOperatorType());
+ tb.appendCell(operatorType == null ? "UNKNOWN_OPERATOR" : operatorType.toString(), null);
+
+ int li = ops.size() - 1;
+ String fmt = " (%s)";
+
+ double setupSum = 0.0;
+ double processSum = 0.0;
+ double waitSum = 0.0;
+ double memSum = 0.0;
+ for (ImmutablePair<OperatorProfile, Integer> ip : ops) {
+ setupSum += ip.getLeft().getSetupNanos();
+ processSum += ip.getLeft().getProcessNanos();
+ waitSum += ip.getLeft().getWaitNanos();
+ memSum += ip.getLeft().getPeakLocalMemoryAllocated();
+ }
+
+ Collections.sort(ops, Comparators.setupTimeSort);
+ tb.appendNanos(ops.get(0).getLeft().getSetupNanos(), String.format(fmt, ops.get(0).getRight()));
+ tb.appendNanos((long) (setupSum / ops.size()), null);
+ tb.appendNanos(ops.get(li).getLeft().getSetupNanos(), String.format(fmt, ops.get(li).getRight()));
+
+ Collections.sort(ops, Comparators.processTimeSort);
+ tb.appendNanos(ops.get(0).getLeft().getProcessNanos(), String.format(fmt, ops.get(0).getRight()));
+ tb.appendNanos((long) (processSum / ops.size()), null);
+ tb.appendNanos(ops.get(li).getLeft().getProcessNanos(), String.format(fmt, ops.get(li).getRight()));
+
+ Collections.sort(ops, Comparators.waitTimeSort);
+ tb.appendNanos(ops.get(0).getLeft().getWaitNanos(), String.format(fmt, ops.get(0).getRight()));
+ tb.appendNanos((long) (waitSum / ops.size()), null);
+ tb.appendNanos(ops.get(li).getLeft().getWaitNanos(), String.format(fmt, ops.get(li).getRight()));
+
+ Collections.sort(ops, Comparators.opPeakMem);
+ tb.appendBytes((long) (memSum / ops.size()), null);
+ tb.appendBytes(ops.get(li).getLeft().getPeakLocalMemoryAllocated(), null);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
new file mode 100644
index 0000000..ae04bad
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.store.sys.EStore;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.work.WorkManager;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.QueryStatus;
+import org.glassfish.jersey.server.mvc.Viewable;
+
+import com.google.common.collect.Lists;
+
+@Path("/")
+public class ProfileResources {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileResources.class);
+
+ @Inject WorkManager work;
+
+ public static class ProfileInfo implements Comparable<ProfileInfo> {
+ public static final SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
+
+ private String queryId;
+ private Date time;
+ private String location;
+ private String foreman;
+ private String query;
+ private String state;
+
+ public ProfileInfo(String queryId, long time, String foreman, String query, String state) {
+ this.queryId = queryId;
+ this.time = new Date(time);
+ this.foreman = foreman;
+ this.location = "http://localhost:8047/profile/" + queryId + ".json";
+ this.query = query = query.substring(0, Math.min(query.length(), 150));
+ this.state = state;
+ }
+
+ public String getQuery(){
+ return query;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public String getTime() {
+ return format.format(time);
+ }
+
+
+ public String getState() {
+ return state;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ @Override
+ public int compareTo(ProfileInfo other) {
+ return time.compareTo(other.time);
+ }
+
+ public String getForeman() {
+ return foreman;
+ }
+
+ }
+
+ private PStoreProvider provider(){
+ return work.getContext().getPersistentStoreProvider();
+ }
+
+ @XmlRootElement
+ public class QProfiles {
+ private List<ProfileInfo> runningQueries;
+ private List<ProfileInfo> finishedQueries;
+
+ public QProfiles(List<ProfileInfo> runningQueries, List<ProfileInfo> finishedQueries) {
+ this.runningQueries = runningQueries;
+ this.finishedQueries = finishedQueries;
+ }
+
+ public List<ProfileInfo> getRunningQueries() {
+ return runningQueries;
+ }
+
+ public List<ProfileInfo> getFinishedQueries() {
+ return finishedQueries;
+ }
+ }
+
+ @GET
+ @Path("/profiles.json")
+ @Produces(MediaType.APPLICATION_JSON)
+ public QProfiles getProfilesJSON() {
+ PStore<QueryProfile> completed = null;
+ PStore<QueryInfo> running = null;
+ try {
+ completed = provider().getStore(QueryStatus.QUERY_PROFILE);
+ running = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
+ } catch (IOException e) {
+ logger.debug("Failed to get profiles from persistent or ephemeral store.");
+ return new QProfiles(new ArrayList<ProfileInfo>(), new ArrayList<ProfileInfo>());
+ }
+
+ List<ProfileInfo> runningQueries = Lists.newArrayList();
+
+ for (Map.Entry<String, QueryInfo> entry : running) {
+ QueryInfo profile = entry.getValue();
+ runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name()));
+ }
+
+ Collections.sort(runningQueries, Collections.reverseOrder());
+
+
+ List<ProfileInfo> finishedQueries = Lists.newArrayList();
+ for (Map.Entry<String, QueryProfile> entry : completed) {
+ QueryProfile profile = entry.getValue();
+ finishedQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name()));
+ }
+
+ return new QProfiles(runningQueries, finishedQueries);
+ }
+
+ @GET
+ @Path("/profiles")
+ @Produces(MediaType.TEXT_HTML)
+ public Viewable getProfiles() {
+ QProfiles profiles = getProfilesJSON();
+ return new Viewable("/rest/profile/list.ftl", profiles);
+ }
+
+ private QueryProfile getQueryProfile(String queryId) {
+ QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
+
+ // first check local running
+ Foreman f = work.getBee().getForemanForQueryId(id);
+ if(f != null){
+ return f.getQueryStatus().getAsProfile();
+ }
+
+ // then check remote running
+ try{
+ PStore<QueryInfo> runningQueries = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
+ QueryInfo info = runningQueries.get(queryId);
+ return work.getContext().getController().getTunnel(info.getForeman()).requestQueryProfile(id).checkedGet(2, TimeUnit.SECONDS);
+ }catch(Exception e){
+ logger.debug("Failure to find query as running profile.", e);
+ }
+
+ // then check blob store
+ try{
+ PStore<QueryProfile> profiles = provider().getStore(QueryStatus.QUERY_PROFILE);
+ return profiles.get(queryId);
+ }catch(Exception e){
+ logger.warn("Failure to load query profile for query {}", queryId, e);
+ }
+
+ // TODO: Improve error messaging.
+ return QueryProfile.getDefaultInstance();
+
+ }
+
+
+ @GET
+ @Path("/profiles/{queryid}.json")
+ @Produces(MediaType.APPLICATION_JSON)
+ public String getProfileJSON(@PathParam("queryid") String queryId) {
+ try {
+ return new String(QueryStatus.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId)));
+ } catch (IOException e) {
+ logger.debug("Failed to serialize profile for: " + queryId);
+ return ("{ 'message' : 'error (unable to serialize profile)' }");
+ }
+ }
+
+ @GET
+ @Path("/profiles/{queryid}")
+ @Produces(MediaType.TEXT_HTML)
+ public Viewable getProfile(@PathParam("queryid") String queryId) {
+ ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId));
+
+ return new Viewable("/rest/profile/profile.ftl", wrapper);
+
+ }
+
+
+ @GET
+ @Path("/profiles/cancel/{queryid}")
+ @Produces(MediaType.TEXT_PLAIN)
+ public String cancelQuery(@PathParam("queryid") String queryId) throws IOException {
+
+ QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
+
+ // first check local running
+ Foreman f = work.getBee().getForemanForQueryId(id);
+ if(f != null){
+ f.cancel();
+ return String.format("Cancelled query %s on locally running node.", queryId);
+ }
+
+ // then check remote running
+ try{
+ PStore<QueryInfo> runningQueries = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
+ QueryInfo info = runningQueries.get(queryId);
+ Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2, TimeUnit.SECONDS);
+ if(a.getOk()){
+ return String.format("Query %s canceled on node %s.", queryId, info.getForeman().getAddress());
+ }else{
+ return String.format("Attempted to cancel query %s on %s but the query is no longer active on that node.", queryId, info.getForeman().getAddress());
+ }
+ }catch(Exception e){
+ logger.debug("Failure to find query as running profile.", e);
+ return String.format("Failure attempting to cancel query %s. Unable to find information about where query is actively running.", queryId);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
new file mode 100644
index 0000000..80016aa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class ProfileWrapper {
+
+ public QueryProfile profile;
+ public String id;
+
+ public ProfileWrapper(QueryProfile profile) {
+ this.profile = profile;
+ this.id = QueryIdHelper.getQueryId(profile.getId());
+ }
+
+ public QueryProfile getProfile() {
+ return profile;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getQueryId() {
+ return QueryIdHelper.getQueryId(profile.getId());
+ }
+
+ public List<OperatorWrapper> getOperatorProfiles() {
+ List<OperatorWrapper> ows = Lists.newArrayList();
+ Map<ImmutablePair<Integer, Integer>, List<ImmutablePair<OperatorProfile, Integer>>> opmap = Maps.newHashMap();
+
+ List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
+ Collections.sort(majors, Comparators.majorIdCompare);
+ for (MajorFragmentProfile major : majors) {
+
+ List<MinorFragmentProfile> minors = new ArrayList<>(major.getMinorFragmentProfileList());
+ Collections.sort(minors, Comparators.minorIdCompare);
+ for (MinorFragmentProfile minor : minors) {
+
+ List<OperatorProfile> ops = new ArrayList<>(minor.getOperatorProfileList());
+ Collections.sort(ops, Comparators.operatorIdCompare);
+ for (OperatorProfile op : ops) {
+
+ ImmutablePair<Integer, Integer> ip = new ImmutablePair<>(
+ major.getMajorFragmentId(), op.getOperatorId());
+ if (!opmap.containsKey(ip)) {
+ List<ImmutablePair<OperatorProfile, Integer>> l = Lists.newArrayList();
+ opmap.put(ip, l);
+ }
+ opmap.get(ip).add(new ImmutablePair<>(op, minor.getMinorFragmentId()));
+ }
+ }
+ }
+
+ List<ImmutablePair<Integer, Integer>> keys = new ArrayList<>(opmap.keySet());
+ Collections.sort(keys);
+
+ for (ImmutablePair<Integer, Integer> ip : keys) {
+ ows.add(new OperatorWrapper(ip.getLeft(), opmap.get(ip)));
+ }
+
+ return ows;
+ }
+
+ public List<FragmentWrapper> getFragmentProfiles() {
+ List<FragmentWrapper> fws = Lists.newArrayList();
+
+ List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
+ Collections.sort(majors, Comparators.majorIdCompare);
+ for (MajorFragmentProfile major : majors) {
+ fws.add(new FragmentWrapper(major, profile.getStart()));
+ }
+
+ return fws;
+ }
+
+ public String getFragmentsOverview() {
+ final String[] columns = {"Major Fragment", "Minor Fragments Reporting", "First Start", "Last Start", "First End", "Last End", "tmin", "tavg", "tmax", "memmax"};
+ TableBuilder tb = new TableBuilder(columns);
+ for (FragmentWrapper fw : getFragmentProfiles()) {
+ fw.addSummary(tb);
+ }
+ return tb.toString();
+ }
+
+
+
+ public String getOperatorsOverview() {
+ final String [] columns = {"Operator", "Type", "Setup (min)", "Setup (avg)", "Setup (max)", "Process (min)", "Process (avg)", "Process (max)", "Wait (min)", "Wait (avg)", "Wait (max)", "Mem (avg)", "Mem (max)"};
+ TableBuilder tb = new TableBuilder(columns);
+ for (OperatorWrapper ow : getOperatorProfiles()) {
+ ow.addSummary(tb);
+ }
+ return tb.toString();
+ }
+
+ public String getOperatorsJSON() {
+ StringBuilder sb = new StringBuilder("{");
+ String sep = "";
+ for (CoreOperatorType op : CoreOperatorType.values()) {
+ sb.append(String.format("%s\"%d\" : \"%s\"", sep, op.ordinal(), op));
+ sep = ", ";
+ }
+ return sb.append("}").toString();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
new file mode 100644
index 0000000..72f4436
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest.profile;
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+
+class TableBuilder {
+ NumberFormat format = NumberFormat.getInstance(Locale.US);
+ SimpleDateFormat hours = new SimpleDateFormat("HH:mm");
+ SimpleDateFormat shours = new SimpleDateFormat("H:mm");
+ SimpleDateFormat mins = new SimpleDateFormat("mm:ss");
+ SimpleDateFormat smins = new SimpleDateFormat("m:ss");
+
+ SimpleDateFormat secs = new SimpleDateFormat("ss.SSS");
+ SimpleDateFormat ssecs = new SimpleDateFormat("s.SSS");
+ DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
+ DecimalFormat dec = new DecimalFormat("0.00");
+ DecimalFormat intformat = new DecimalFormat("#,###");
+
+ StringBuilder sb;
+ int w = 0;
+ int width;
+
+ public TableBuilder(String[] columns) {
+ sb = new StringBuilder();
+ width = columns.length;
+
+ format.setMaximumFractionDigits(3);
+ format.setMinimumFractionDigits(3);
+
+ sb.append("<table class=\"table table-bordered text-right\">\n<tr>");
+ for (String cn : columns) {
+ sb.append("<th>" + cn + "</th>");
+ }
+ sb.append("</tr>\n");
+ }
+
+ public void appendCell(String s, String link) {
+ if (w == 0) {
+ sb.append("<tr>");
+ }
+ sb.append(String.format("<td>%s%s</td>", s, link != null ? link : ""));
+ if (++w >= width) {
+ sb.append("</tr>\n");
+ w = 0;
+ }
+ }
+
+ public void appendRepeated(String s, String link, int n) {
+ for (int i = 0; i < n; i++) {
+ appendCell(s, link);
+ }
+ }
+
+ public void appendTime(long d, String link) {
+ appendCell(dateFormat.format(d), link);
+ }
+
+ public void appendMillis(long p, String link) {
+ double secs = p/1000.0;
+ double mins = secs/60;
+ double hours = mins/60;
+ SimpleDateFormat timeFormat = null;
+ if(hours >= 10){
+ timeFormat = this.hours;
+ }else if(hours >= 1){
+ timeFormat = this.shours;
+ }else if (mins >= 10){
+ timeFormat = this.mins;
+ }else if (mins >= 1){
+ timeFormat = this.smins;
+ }else if (secs >= 10){
+ timeFormat = this.secs;
+ }else {
+ timeFormat = this.ssecs;
+ }
+ appendCell(timeFormat.format(new Date(p)), null);
+ }
+
+ public void appendNanos(long p, String link) {
+ appendMillis((long) (p / 1000.0 / 1000.0), link);
+ }
+
+ public void appendFormattedNumber(Number n, String link) {
+ appendCell(format.format(n), link);
+ }
+
+ public void appendFormattedInteger(long n, String link) {
+ appendCell(intformat.format(n), link);
+ }
+
+ public void appendInteger(long l, String link) {
+ appendCell(Long.toString(l), link);
+ }
+
+ public void appendBytes(long l, String link){
+ appendCell(bytePrint(l), link);
+ }
+
+ private String bytePrint(long size){
+ double m = size/Math.pow(1024, 2);
+ double g = size/Math.pow(1024, 3);
+ double t = size/Math.pow(1024, 4);
+ if (t > 1) {
+ return dec.format(t).concat("TB");
+ } else if (g > 1) {
+ return dec.format(g).concat("GB");
+ } else if (m > 1){
+ return intformat.format(m).concat("MB");
+ } else {
+ return "-";
+ }
+ }
+
+ @Override
+ public String toString() {
+ String rv;
+ rv = sb.append("\n</table>").toString();
+ sb = null;
+ return rv;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index b33042b..378e81a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -486,7 +486,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
case AWAITING_ALLOCATION:
case RUNNING:
if(data.isLocal()){
- rootRunner.cancel();
+ if(rootRunner != null){
+ rootRunner.cancel();
+ }
}else{
bee.getContext().getController().getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/a9ee7911/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
index 8fd4b97..1b0885d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
@@ -106,7 +106,7 @@ public abstract class AbstractStatusReporter implements StatusReporter{
@Override
public final void fail(FragmentHandle handle, String message, Throwable excep) {
- FragmentStatus.Builder status = getBuilder(FragmentState.FAILED, message, excep);
+ FragmentStatus.Builder status = getBuilder(context, FragmentState.FAILED, message, excep);
fail(handle, status);
}