You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/16 23:31:24 UTC
[15/32] git commit: Fix and improve runtime stats profiles - Stop
stats processing while waiting for next. - Fix stats collection in
PartitionSender and ScanBatch - Add stats to all senders - Add wait time to
operator profile.
Fix and improve runtime stats profiles
- Stop stats processing while waiting for next.
- Fix stats collection in PartitionSender and ScanBatch
- Add stats to all senders
- Add wait time to operator profile.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fc1a7778
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fc1a7778
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fc1a7778
Branch: refs/heads/master
Commit: fc1a7778e2af3b07117f99070530dd5a296ebc6d
Parents: 49a9ff2
Author: Steven Phillips <sp...@maprtech.com>
Authored: Fri Jun 13 13:14:12 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 16 08:04:43 2014 -0700
----------------------------------------------------------------------
.../apache/drill/exec/ops/OperatorStats.java | 20 ++-
.../drill/exec/physical/impl/BaseRootExec.java | 36 +++-
.../physical/impl/RandomReceiverCreator.java | 2 +-
.../drill/exec/physical/impl/ScanBatch.java | 58 +++---
.../drill/exec/physical/impl/ScreenCreator.java | 39 ++--
.../exec/physical/impl/SingleSenderCreator.java | 29 ++-
.../exec/physical/impl/WireRecordBatch.java | 25 ++-
.../BroadcastSenderRootExec.java | 28 ++-
.../impl/mergereceiver/MergingRecordBatch.java | 15 +-
.../PartitionSenderRootExec.java | 18 +-
.../partitionsender/PartitionerTemplate.java | 14 +-
.../drill/exec/record/AbstractRecordBatch.java | 2 +
.../drill/exec/server/rest/ProfileWrapper.java | 177 ++++++++++++-------
.../drill/exec/proto/SchemaUserBitShared.java | 7 +
.../apache/drill/exec/proto/UserBitShared.java | 138 ++++++++++++---
.../drill/exec/proto/beans/OperatorProfile.java | 22 +++
protocol/src/main/protobuf/UserBitShared.proto | 1 +
17 files changed, 466 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index 4ac8f74..4afea7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -41,12 +41,15 @@ public class OperatorStats {
private boolean inProcessing = false;
private boolean inSetup = false;
+ private boolean inWait = false;
protected long processingNanos;
protected long setupNanos;
+ protected long waitNanos;
private long processingMark;
private long setupMark;
+ private long waitMark;
private long schemas;
@@ -89,6 +92,20 @@ public class OperatorStats {
inProcessing = false;
}
+ public void startWait() {
+ assert !inWait;
+ stopProcessing();
+ inWait = true;
+ waitMark = System.nanoTime();
+ }
+
+ public void stopWait() {
+ assert inWait;
+ startProcessing();
+ waitNanos += System.nanoTime() - waitMark;
+ inWait = false;
+ }
+
public void batchReceived(int inputIndex, long records, boolean newSchema) {
recordsReceivedByInput[inputIndex] += records;
batchesReceivedByInput[inputIndex]++;
@@ -103,7 +120,8 @@ public class OperatorStats {
.setOperatorType(operatorType) //
.setOperatorId(operatorId) //
.setSetupNanos(setupNanos) //
- .setProcessNanos(processingNanos);
+ .setProcessNanos(processingNanos)
+ .setWaitNanos(waitNanos);
addAllMetrics(b);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 256c106..452052b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,11 +17,25 @@
*/
package org.apache.drill.exec.physical.impl;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.SenderStats;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
public abstract class BaseRootExec implements RootExec {
- protected OperatorStats stats = null;
+ protected SenderStats stats = null;
+ protected OperatorContext oContext = null;
+
+ public BaseRootExec(FragmentContext context, PhysicalOperator config) throws OutOfMemoryException {
+ this.stats = new SenderStats(config);
+ context.getStats().addOperatorStats(this.stats);
+ this.oContext = new OperatorContext(config, context, stats);
+ }
@Override
public final boolean next() {
@@ -35,8 +49,24 @@ public abstract class BaseRootExec implements RootExec {
}
}
- public void setStats(OperatorStats stats) {
- this.stats = stats;
+ public final IterOutcome next(RecordBatch b){
+ stats.stopProcessing();
+ IterOutcome next;
+ try {
+ next = b.next();
+ } finally {
+ stats.startProcessing();
+ }
+
+ switch(next){
+ case OK_NEW_SCHEMA:
+ stats.batchReceived(0, b.getRecordCount(), true);
+ break;
+ case OK:
+ stats.batchReceived(0, b.getRecordCount(), false);
+ break;
+ }
+ return next;
}
public abstract boolean innerNext();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
index 966c221..4ff5831 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
@@ -39,7 +39,7 @@ public class RandomReceiverCreator implements BatchCreator<RandomReceiver>{
RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
assert buffers.length == 1;
RawBatchBuffer buffer = buffers[0];
- return new WireRecordBatch(context, buffer);
+ return new WireRecordBatch(context, buffer, receiver);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index d142ff8..55d3f62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -127,34 +127,44 @@ public class ScanBatch implements RecordBatch {
@Override
public IterOutcome next() {
- mutator.allocate(MAX_RECORD_CNT);
- while ((recordCount = currentReader.next()) == 0) {
- try {
- if (!readers.hasNext()) {
- currentReader.cleanup();
+ oContext.getStats().startProcessing();
+ try {
+ mutator.allocate(MAX_RECORD_CNT);
+ while ((recordCount = currentReader.next()) == 0) {
+ try {
+ if (!readers.hasNext()) {
+ currentReader.cleanup();
+ releaseAssets();
+ return IterOutcome.NONE;
+ }
+ oContext.getStats().startSetup();
+ try {
+ currentReader.cleanup();
+ currentReader = readers.next();
+ partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
+ currentReader.setup(mutator);
+ mutator.allocate(MAX_RECORD_CNT);
+ addPartitionVectors();
+ } finally {
+ oContext.getStats().stopSetup();
+ }
+ } catch (ExecutionSetupException e) {
+ this.context.fail(e);
releaseAssets();
- return IterOutcome.NONE;
+ return IterOutcome.STOP;
}
- currentReader.cleanup();
- currentReader = readers.next();
- partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
- currentReader.setup(mutator);
- mutator.allocate(MAX_RECORD_CNT);
- addPartitionVectors();
- } catch (ExecutionSetupException e) {
- this.context.fail(e);
- releaseAssets();
- return IterOutcome.STOP;
}
- }
- populatePartitionVectors();
- if (mutator.isNewSchema()) {
- container.buildSchema(SelectionVectorMode.NONE);
- schema = container.getSchema();
- return IterOutcome.OK_NEW_SCHEMA;
- } else {
- return IterOutcome.OK;
+ populatePartitionVectors();
+ if (mutator.isNewSchema()) {
+ container.buildSchema(SelectionVectorMode.NONE);
+ schema = container.getSchema();
+ return IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ return IterOutcome.OK;
+ }
+ } finally {
+ oContext.getStats().stopProcessing();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 643552b..86e77d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -21,7 +21,10 @@ import io.netty.buffer.ByteBuf;
import java.util.List;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
@@ -44,14 +47,14 @@ public class ScreenCreator implements RootCreator<Screen>{
@Override
- public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
+ public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkNotNull(children);
Preconditions.checkArgument(children.size() == 1);
- return new ScreenRoot(context, children.iterator().next());
+ return new ScreenRoot(context, children.iterator().next(), config);
}
- static class ScreenRoot implements RootExec{
+ static class ScreenRoot extends BaseRootExec {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
volatile boolean ok = true;
@@ -62,9 +65,9 @@ public class ScreenCreator implements RootCreator<Screen>{
final UserClientConnection connection;
private RecordMaterializer materializer;
- public ScreenRoot(FragmentContext context, RecordBatch incoming){
+ public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
+ super(context, config);
assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client. As such, this should always be true.";
-
this.context = context;
this.incoming = incoming;
this.connection = context.getConnection();
@@ -72,14 +75,14 @@ public class ScreenCreator implements RootCreator<Screen>{
@Override
- public boolean next() {
+ public boolean innerNext() {
if(!ok){
stop();
context.fail(this.listener.ex);
return false;
}
- IterOutcome outcome = incoming.next();
+ IterOutcome outcome = next(incoming);
// logger.debug("Screen Outcome {}", outcome);
switch(outcome){
case STOP: {
@@ -92,7 +95,12 @@ public class ScreenCreator implements RootCreator<Screen>{
.setIsLastChunk(true) //
.build();
QueryWritableBatch batch = new QueryWritableBatch(header);
- connection.sendResult(listener, batch);
+ stats.startWait();
+ try {
+ connection.sendResult(listener, batch);
+ } finally {
+ stats.stopWait();
+ }
sendCount.increment();
return false;
@@ -107,7 +115,12 @@ public class ScreenCreator implements RootCreator<Screen>{
.setIsLastChunk(true) //
.build();
QueryWritableBatch batch = new QueryWritableBatch(header);
- connection.sendResult(listener, batch);
+ stats.startWait();
+ try {
+ connection.sendResult(listener, batch);
+ } finally {
+ stats.stopWait();
+ }
sendCount.increment();
return false;
@@ -119,7 +132,12 @@ public class ScreenCreator implements RootCreator<Screen>{
// context.getStats().batchesCompleted.inc(1);
// context.getStats().recordsCompleted.inc(incoming.getRecordCount());
QueryWritableBatch batch = materializer.convertNext(false);
- connection.sendResult(listener, batch);
+ stats.startWait();
+ try {
+ connection.sendResult(listener, batch);
+ } finally {
+ stats.stopWait();
+ }
sendCount.increment();
return true;
@@ -131,6 +149,7 @@ public class ScreenCreator implements RootCreator<Screen>{
@Override
public void stop() {
sendCount.waitForSendComplete();
+ oContext.close();
incoming.cleanup();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 7679701..9e91468 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -22,7 +22,10 @@ import io.netty.buffer.ByteBuf;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.SenderStats;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -44,7 +47,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
- private static class SingleSenderRootExec implements RootExec{
+ private static class SingleSenderRootExec extends BaseRootExec {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
private RecordBatch incoming;
private DataTunnel tunnel;
@@ -53,8 +56,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
private FragmentContext context;
private volatile boolean ok = true;
private final SendingAccountor sendCount = new SendingAccountor();
-
- public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){
+
+ public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
+ super(context, config);
this.incoming = batch;
assert(incoming != null);
this.handle = context.getHandle();
@@ -65,27 +69,37 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
}
@Override
- public boolean next() {
+ public boolean innerNext() {
if(!ok){
incoming.kill();
return false;
}
- IterOutcome out = incoming.next();
+ IterOutcome out = next(incoming);
// logger.debug("Outcome of sender next {}", out);
switch(out){
case STOP:
case NONE:
FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0);
sendCount.increment();
- tunnel.sendRecordBatch(new RecordSendFailure(), b2);
+ stats.startWait();
+ try {
+ tunnel.sendRecordBatch(new RecordSendFailure(), b2);
+ } finally {
+ stats.stopWait();
+ }
return false;
case OK_NEW_SCHEMA:
case OK:
FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
sendCount.increment();
- tunnel.sendRecordBatch(new RecordSendFailure(), batch);
+ stats.startWait();
+ try {
+ tunnel.sendRecordBatch(new RecordSendFailure(), batch);
+ } finally {
+ stats.stopWait();
+ }
return true;
case NOT_YET:
@@ -98,6 +112,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
public void stop() {
ok = false;
sendCount.waitForSendComplete();
+ oContext.close();
incoming.cleanup();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index c7fc813..bc2cdb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -24,6 +24,9 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OpProfileDef;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.config.RandomReceiver;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RawFragmentBatch;
@@ -43,14 +46,16 @@ public class WireRecordBatch implements RecordBatch {
private RawFragmentBatchProvider fragProvider;
private FragmentContext context;
private BatchSchema schema;
+ private OperatorStats stats;
- public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) throws OutOfMemoryException {
+ public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, RandomReceiver config) throws OutOfMemoryException {
this.fragProvider = fragProvider;
this.context = context;
// In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
// we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader.
this.batchLoader = new RecordBatchLoader(context.getAllocator());
+ this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 0));
}
@Override
@@ -100,14 +105,22 @@ public class WireRecordBatch implements RecordBatch {
@Override
public IterOutcome next() {
+ stats.startProcessing();
try{
- RawFragmentBatch batch = fragProvider.getNext();
-
- // skip over empty batches. we do this since these are basically control messages.
- while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){
+ RawFragmentBatch batch;
+ try {
+ stats.startWait();
batch = fragProvider.getNext();
+
+ // skip over empty batches. we do this since these are basically control messages.
+ while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){
+ batch = fragProvider.getNext();
+ }
+ } finally {
+ stats.stopWait();
}
+
if (batch == null){
batchLoader.clear();
return IterOutcome.NONE;
@@ -133,6 +146,8 @@ public class WireRecordBatch implements RecordBatch {
}catch(SchemaChangeException | IOException ex){
context.fail(ex);
return IterOutcome.STOP;
+ } finally {
+ stats.stopProcessing();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 9c55825..a70cd50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -21,8 +21,12 @@ import io.netty.buffer.ByteBuf;
import java.util.List;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.SenderStats;
import org.apache.drill.exec.physical.config.BroadcastSender;
+import org.apache.drill.exec.physical.impl.BaseRootExec;
import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -43,7 +47,7 @@ import org.apache.drill.exec.work.ErrorHelper;
* This is useful in cases such as broadcast join where sending the entire table to join
* to all nodes is cheaper than merging and computing all the joins in the same node.
*/
-public class BroadcastSenderRootExec implements RootExec {
+public class BroadcastSenderRootExec extends BaseRootExec {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class);
private final FragmentContext context;
private final BroadcastSender config;
@@ -54,7 +58,8 @@ public class BroadcastSenderRootExec implements RootExec {
public BroadcastSenderRootExec(FragmentContext context,
RecordBatch incoming,
- BroadcastSender config) {
+ BroadcastSender config) throws OutOfMemoryException {
+ super(context, config);
this.ok = true;
this.context = context;
this.incoming = incoming;
@@ -69,20 +74,25 @@ public class BroadcastSenderRootExec implements RootExec {
}
@Override
- public boolean next() {
+ public boolean innerNext() {
if(!ok) {
context.fail(statusHandler.ex);
return false;
}
- RecordBatch.IterOutcome out = incoming.next();
+ RecordBatch.IterOutcome out = next(incoming);
logger.debug("Outcome of sender next {}", out);
switch(out){
case STOP:
case NONE:
for (int i = 0; i < tunnels.length; ++i) {
FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i);
- tunnels[i].sendRecordBatch(this.statusHandler, b2);
+ stats.startWait();
+ try {
+ tunnels[i].sendRecordBatch(this.statusHandler, b2);
+ } finally {
+ stats.stopWait();
+ }
statusHandler.sendCount.increment();
}
@@ -96,7 +106,12 @@ public class BroadcastSenderRootExec implements RootExec {
}
for (int i = 0; i < tunnels.length; ++i) {
FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch);
- tunnels[i].sendRecordBatch(this.statusHandler, batch);
+ stats.startWait();
+ try {
+ tunnels[i].sendRecordBatch(this.statusHandler, batch);
+ } finally {
+ stats.stopWait();
+ }
statusHandler.sendCount.increment();
}
@@ -135,6 +150,7 @@ public class BroadcastSenderRootExec implements RootExec {
public void stop() {
ok = false;
statusHandler.sendCount.waitForSendComplete();
+ oContext.close();
incoming.cleanup();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 25ee667..a5d80b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -119,13 +119,16 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{
- long startNext = System.nanoTime();
- RawFragmentBatch b = provider.getNext();
- if(b != null){
- stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
+ stats.startWait();
+ try {
+ RawFragmentBatch b = provider.getNext();
+ if(b != null){
+ stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
+ }
+ return b;
+ } finally {
+ stats.stopWait();
}
- stats.addLongStat(Metric.NEXT_WAIT_NANOS, System.nanoTime() - startNext);
- return b;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index bb640b4..7535dcc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -56,28 +56,22 @@ public class PartitionSenderRootExec extends BaseRootExec {
private HashPartitionSender operator;
private Partitioner partitioner;
private FragmentContext context;
- private OperatorContext oContext;
private boolean ok = true;
private final SendingAccountor sendCount = new SendingAccountor();
private final int outGoingBatchCount;
private final HashPartitionSender popConfig;
private final StatusHandler statusHandler;
- private final SenderStats stats;
public PartitionSenderRootExec(FragmentContext context,
RecordBatch incoming,
HashPartitionSender operator) throws OutOfMemoryException {
-
+ super(context, operator);
this.incoming = incoming;
this.operator = operator;
this.context = context;
this.outGoingBatchCount = operator.getDestinations().size();
this.popConfig = operator;
this.statusHandler = new StatusHandler(sendCount, context);
- this.stats = new SenderStats(operator);
- context.getStats().addOperatorStats(this.stats);
- setStats(stats);
- this.oContext = new OperatorContext(operator, context, stats);
}
@Override
@@ -90,7 +84,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
return false;
}
- RecordBatch.IterOutcome out = incoming.next();
+ RecordBatch.IterOutcome out = next(incoming);
+
logger.debug("Partitioner.next(): got next record batch with status {}", out);
switch(out){
case NONE:
@@ -122,7 +117,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
partitioner.flushOutgoingBatches(false, true);
partitioner.clear();
}
- // update DeprecatedOutgoingRecordBatch's schema and generate partitioning code
createPartitioner();
} catch (IOException e) {
incoming.kill();
@@ -227,6 +221,12 @@ public class PartitionSenderRootExec extends BaseRootExec {
fieldId,
WritableBatch.getBatchNoHVWrap(0, container, false));
tunnel.sendRecordBatch(statusHandler, writableBatch);
+ stats.startWait();
+ try {
+ tunnel.sendRecordBatch(statusHandler, writableBatch);
+ } finally {
+ stats.stopWait();
+ }
this.sendCount.increment();
fieldId++;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 9bb24d4..6a26d30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -265,7 +265,12 @@ public abstract class PartitionerTemplate implements Partitioner {
oppositeMinorFragmentId,
getWritableBatch());
- tunnel.sendRecordBatch(statusHandler, writableBatch);
+ stats.startWait();
+ try {
+ tunnel.sendRecordBatch(statusHandler, writableBatch);
+ } finally {
+ stats.stopWait();
+ }
this.sendCount.increment();
} else {
logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : ""));
@@ -278,7 +283,12 @@ public abstract class PartitionerTemplate implements Partitioner {
operator.getOppositeMajorFragmentId(),
oppositeMinorFragmentId,
getWritableBatch());
- tunnel.sendRecordBatch(statusHandler, writableBatch);
+ stats.startWait();
+ try {
+ tunnel.sendRecordBatch(statusHandler, writableBatch);
+ } finally {
+ stats.stopWait();
+ }
this.sendCount.increment();
vectorContainer.clear();
return;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index d71b811..72a7d3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -66,7 +66,9 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
public final IterOutcome next(int inputIndex, RecordBatch b){
+ stats.stopProcessing();
IterOutcome next = b.next();
+ stats.startProcessing();
switch(next){
case OK_NEW_SCHEMA:
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
index a1d4df9..2952c41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
@@ -23,15 +23,20 @@ import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
+import java.text.DateFormat;
import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
import java.util.Collections;
+import java.util.Date;
import java.util.List;
import java.util.Locale;
public class ProfileWrapper {
NumberFormat format = NumberFormat.getInstance(Locale.US);
+ DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
public QueryProfile profile;
@@ -46,10 +51,14 @@ public class ProfileWrapper {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("MAJOR FRAGMENTS\nid\tmin\tavg\tmax\t(time in ms)\n\n" + listMajorFragments());
+ builder.append("MAJOR FRAGMENTS\nid\tfirst start\tlast start\tfirst end\tlast end\tmin\tavg\tmax\t(time in ms)\n\n" + listMajorFragments());
builder.append("\n");
for (MajorFragmentProfile majorProfile : profile.getFragmentProfileList()) {
- builder.append(String.format("Major Fragment: %d\n%s\n", majorProfile.getMajorFragmentId(), new MajorFragmentWrapper(majorProfile).toString()));
+ builder.append(String.format("Major Fragment: %d\n%s\n", majorProfile.getMajorFragmentId(), printOperatorsInMajor(majorProfile)));
+ }
+ builder.append("\n");
+ for (MajorFragmentProfile majorProfile : profile.getFragmentProfileList()) {
+ builder.append(String.format("Major Fragment: %d\n%s\n", majorProfile.getMajorFragmentId(), printMinorFragmentsInMajor(majorProfile)));
}
return builder.toString();
}
@@ -58,8 +67,12 @@ public class ProfileWrapper {
StringBuilder builder = new StringBuilder();
for (MajorFragmentProfile m : profile.getFragmentProfileList()) {
List<Long> totalTimes = Lists.newArrayList();
+ List<Long> startTimes = Lists.newArrayList();
+ List<Long> endTimes = Lists.newArrayList();
for (MinorFragmentProfile minorFragmentProfile : m.getMinorFragmentProfileList()) {
totalTimes.add(minorFragmentProfile.getEndTime() - minorFragmentProfile.getStartTime());
+ startTimes.add(minorFragmentProfile.getStartTime());
+ endTimes.add(minorFragmentProfile.getEndTime());
}
long min = Collections.min(totalTimes);
long max = Collections.max(totalTimes);
@@ -67,82 +80,120 @@ public class ProfileWrapper {
for (Long l : totalTimes) {
sum += l;
}
+ long firstStart = Collections.min(startTimes);
+ long lastStart = Collections.max(startTimes);
+ long firstEnd = Collections.min(endTimes);
+ long lastEnd = Collections.max(endTimes);
long avg = sum / totalTimes.size();
- builder.append(String.format("%d\t%s\t%s\t%s\n", m.getMajorFragmentId(), format.format(min), format.format(avg), format.format(max)));
+ builder.append(String.format("%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", m.getMajorFragmentId(), dateFormat.format(new Date(firstStart)),
+ dateFormat.format(new Date(lastStart)), dateFormat.format(new Date(firstEnd)), dateFormat.format(new Date(lastEnd)),
+ format.format(min), format.format(avg), format.format(max)));
}
return builder.toString();
}
- public class MajorFragmentWrapper {
- MajorFragmentProfile majorFragmentProfile;
+ public String printMinorFragmentsInMajor(MajorFragmentProfile majorFragmentProfile) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("id\tstart\tend\ttotal time (ms)\tmax records\tbatches\n");
+ for (MinorFragmentProfile m : majorFragmentProfile.getMinorFragmentProfileList()) {
+ long startTime = m.getStartTime();
+ long endTime = m.getEndTime();
+
+ List<OperatorProfile> operators = m.getOperatorProfileList();
+ OperatorProfile biggest = null;
+ int biggestIncomingRecords = 0;
+ for (OperatorProfile oProfile : operators) {
+ if (biggest == null) {
+ biggest = oProfile;
+ int incomingRecordCount = 0;
+ for (StreamProfile streamProfile : oProfile.getInputProfileList()) {
+ incomingRecordCount += streamProfile.getRecords();
+ }
+ biggestIncomingRecords = incomingRecordCount;
+ } else {
+ int incomingRecordCount = 0;
+ for (StreamProfile streamProfile : oProfile.getInputProfileList()) {
+ incomingRecordCount += streamProfile.getRecords();
+ }
+ if (incomingRecordCount > biggestIncomingRecords) {
+ biggest = oProfile;
+ biggestIncomingRecords = incomingRecordCount;
+ }
+ }
+ }
- public MajorFragmentWrapper(MajorFragmentProfile majorFragmentProfile) {
- this.majorFragmentProfile = majorFragmentProfile;
- }
+ int biggestBatches = 0;
+ for (StreamProfile sProfile : biggest.getInputProfileList()) {
+ biggestBatches += sProfile.getBatches();
+ }
- @Override
- public String toString() {
- return String.format("Minor Fragments\nid\ttotal time (ms)\n%s\nOperators\nid\ttype\tmin\tavg\tmax\t(time in ns)\n%s\n", new MinorFragmentsInMajor().toString(), new OperatorsInMajor().toString());
+ builder.append(String.format("%d\t%s\t%s\t%s\t%s\t%s\n", m.getMinorFragmentId(), dateFormat.format(new Date(startTime)),
+ dateFormat.format(new Date(endTime)), format.format(endTime - startTime), biggestIncomingRecords, biggestBatches));
}
+ return builder.toString();
+ }
- public class MinorFragmentsInMajor {
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- for (MinorFragmentProfile minorFragmentProfile: majorFragmentProfile.getMinorFragmentProfileList()) {
- builder.append(String.format("%d\t%s\n", minorFragmentProfile.getMinorFragmentId(), format.format(minorFragmentProfile.getEndTime() - minorFragmentProfile.getStartTime())));
+ public String printOperatorsInMajor(MajorFragmentProfile majorFragmentProfile) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("id\ttype\tp min\tp avg\tp max\ts min\ts avg\ts max\tw min\tw avg\tw max\n");
+ int numOperators = majorFragmentProfile.getMinorFragmentProfile(0).getOperatorProfileCount();
+ int numFragments = majorFragmentProfile.getMinorFragmentProfileCount();
+ long[][] processing = new long[numOperators + 1][numFragments];
+ long[][] setup = new long[numOperators + 1][numFragments];
+ long[][] wait = new long[numOperators + 1][numFragments];
+ CoreOperatorType[] operatorTypes = new CoreOperatorType[numOperators + 1];
+
+ for (int i = 0; i < numFragments; i++) {
+ MinorFragmentProfile minorProfile = majorFragmentProfile.getMinorFragmentProfile(i);
+ for (int j = 0; j < numOperators; j++) {
+ OperatorProfile operatorProfile = minorProfile.getOperatorProfile(j);
+ int operatorId = operatorProfile.getOperatorId();
+ processing[operatorId][i] = operatorProfile.getProcessNanos();
+ setup[operatorId][i] = operatorProfile.getSetupNanos();
+ wait[operatorId][i] = operatorProfile.getWaitNanos();
+ if (i == 0) {
+ operatorTypes[operatorId] = CoreOperatorType.valueOf(operatorProfile.getOperatorType());
}
- return builder.toString();
}
}
- public class OperatorsInMajor {
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- int numOperators = majorFragmentProfile.getMinorFragmentProfile(0).getOperatorProfileCount();
- int numFragments = majorFragmentProfile.getMinorFragmentProfileCount();
- long[][] values = new long[numOperators + 1][numFragments];
- CoreOperatorType[] operatorTypes = new CoreOperatorType[numOperators + 1];
-
- for (int i = 0; i < numFragments; i++) {
- MinorFragmentProfile minorProfile = majorFragmentProfile.getMinorFragmentProfile(i);
- for (int j = 0; j < numOperators; j++) {
- OperatorProfile operatorProfile = minorProfile.getOperatorProfile(j);
- int operatorId = operatorProfile.getOperatorId();
- values[operatorId][i] = operatorProfile.getProcessNanos() + operatorProfile.getSetupNanos();
- if (i == 0) {
- operatorTypes[operatorId] = CoreOperatorType.valueOf(operatorProfile.getOperatorType());
- }
- }
- }
-
- for (int j = 0; j < numOperators + 1; j++) {
- if (operatorTypes[j] == null) {
- continue;
- }
- long min = Long.MAX_VALUE;
- long max = Long.MIN_VALUE;
- long sum = 0;
-
- for (int i = 0; i < numFragments; i++) {
- min = Math.min(min, values[j][i]);
- max = Math.max(max, values[j][i]);
- sum += values[j][i];
- }
+ for (int j = 0; j < numOperators + 1; j++) {
+ if (operatorTypes[j] == null) {
+ continue;
+ }
+ long processingMin = Long.MAX_VALUE;
+ long processingMax = Long.MIN_VALUE;
+ long processingSum = 0;
+ long setupMin = Long.MAX_VALUE;
+ long setupMax = Long.MIN_VALUE;
+ long setupSum = 0;
+ long waitMin = Long.MAX_VALUE;
+ long waitMax = Long.MIN_VALUE;
+ long waitSum = 0;
+
+ for (int i = 0; i < numFragments; i++) {
+ processingMin = Math.min(processingMin, processing[j][i]);
+ processingMax = Math.max(processingMax, processing[j][i]);
+ processingSum += processing[j][i];
+
+ setupMin = Math.min(setupMin, setup[j][i]);
+ setupMax = Math.max(setupMax, setup[j][i]);
+ setupSum += setup[j][i];
+
+ waitMin = Math.min(waitMin, wait[j][i]);
+ waitMax = Math.max(waitMax, wait[j][i]);
+ waitSum += wait[j][i];
+ }
- long avg = sum / numFragments;
+ long processingAvg = processingSum / numFragments;
+ long setupAvg = setupSum / numFragments;
+ long waitAvg = waitSum / numFragments;
- builder.append(String.format("%d\t%s\t%s\t%s\t%s\n", j, operatorTypes[j].toString(), format.format(min), format.format(avg), format.format(max)));
- }
- return builder.toString();
- }
+ builder.append(String.format("%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", j, operatorTypes[j].toString(),
+ format.format(processingMin/1000/1000), format.format(processingAvg/1000/1000), format.format(processingMax/1000/1000),
+ format.format(setupMin/1000/1000), format.format(setupAvg/1000/1000), format.format(setupMax/1000/1000),
+ format.format(waitMin/1000/1000), format.format(waitAvg/1000/1000), format.format(waitMax/1000/1000)));
}
+ return builder.toString();
}
-
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
index b4718bb..23fcf21 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
@@ -1755,6 +1755,8 @@ public final class SchemaUserBitShared
for(org.apache.drill.exec.proto.UserBitShared.MetricValue metric : message.getMetricList())
output.writeObject(8, metric, org.apache.drill.exec.proto.SchemaUserBitShared.MetricValue.WRITE, true);
+ if(message.hasWaitNanos())
+ output.writeInt64(9, message.getWaitNanos(), false);
}
public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.OperatorProfile message)
{
@@ -1817,6 +1819,9 @@ public final class SchemaUserBitShared
builder.addMetric(input.mergeObject(org.apache.drill.exec.proto.UserBitShared.MetricValue.newBuilder(), org.apache.drill.exec.proto.SchemaUserBitShared.MetricValue.MERGE));
break;
+ case 9:
+ builder.setWaitNanos(input.readInt64());
+ break;
default:
input.handleUnknownField(number, this);
}
@@ -1864,6 +1869,7 @@ public final class SchemaUserBitShared
case 6: return "processNanos";
case 7: return "localMemoryAllocated";
case 8: return "metric";
+ case 9: return "waitNanos";
default: return null;
}
}
@@ -1882,6 +1888,7 @@ public final class SchemaUserBitShared
fieldMap.put("processNanos", 6);
fieldMap.put("localMemoryAllocated", 7);
fieldMap.put("metric", 8);
+ fieldMap.put("waitNanos", 9);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index c100968..faeba6f 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -13836,6 +13836,16 @@ public final class UserBitShared {
*/
org.apache.drill.exec.proto.UserBitShared.MetricValueOrBuilder getMetricOrBuilder(
int index);
+
+ // optional int64 wait_nanos = 9;
+ /**
+ * <code>optional int64 wait_nanos = 9;</code>
+ */
+ boolean hasWaitNanos();
+ /**
+ * <code>optional int64 wait_nanos = 9;</code>
+ */
+ long getWaitNanos();
}
/**
* Protobuf type {@code exec.shared.OperatorProfile}
@@ -13929,6 +13939,11 @@ public final class UserBitShared {
metric_.add(input.readMessage(org.apache.drill.exec.proto.UserBitShared.MetricValue.PARSER, extensionRegistry));
break;
}
+ case 72: {
+ bitField0_ |= 0x00000020;
+ waitNanos_ = input.readInt64();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -14127,6 +14142,22 @@ public final class UserBitShared {
return metric_.get(index);
}
+ // optional int64 wait_nanos = 9;
+ public static final int WAIT_NANOS_FIELD_NUMBER = 9;
+ private long waitNanos_;
+ /**
+ * <code>optional int64 wait_nanos = 9;</code>
+ */
+ public boolean hasWaitNanos() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional int64 wait_nanos = 9;</code>
+ */
+ public long getWaitNanos() {
+ return waitNanos_;
+ }
+
private void initFields() {
inputProfile_ = java.util.Collections.emptyList();
operatorId_ = 0;
@@ -14135,6 +14166,7 @@ public final class UserBitShared {
processNanos_ = 0L;
localMemoryAllocated_ = 0L;
metric_ = java.util.Collections.emptyList();
+ waitNanos_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -14169,6 +14201,9 @@ public final class UserBitShared {
for (int i = 0; i < metric_.size(); i++) {
output.writeMessage(8, metric_.get(i));
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeInt64(9, waitNanos_);
+ }
getUnknownFields().writeTo(output);
}
@@ -14206,6 +14241,10 @@ public final class UserBitShared {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(8, metric_.get(i));
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(9, waitNanos_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -14346,6 +14385,8 @@ public final class UserBitShared {
} else {
metricBuilder_.clear();
}
+ waitNanos_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
@@ -14412,6 +14453,10 @@ public final class UserBitShared {
} else {
result.metric_ = metricBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.waitNanos_ = waitNanos_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -14495,6 +14540,9 @@ public final class UserBitShared {
}
}
}
+ if (other.hasWaitNanos()) {
+ setWaitNanos(other.getWaitNanos());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -15167,6 +15215,39 @@ public final class UserBitShared {
return metricBuilder_;
}
+ // optional int64 wait_nanos = 9;
+ private long waitNanos_ ;
+ /**
+ * <code>optional int64 wait_nanos = 9;</code>
+ */
+ public boolean hasWaitNanos() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional int64 wait_nanos = 9;</code>
+ */
+ public long getWaitNanos() {
+ return waitNanos_;
+ }
+ /**
+ * <code>optional int64 wait_nanos = 9;</code>
+ */
+ public Builder setWaitNanos(long value) {
+ bitField0_ |= 0x00000080;
+ waitNanos_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int64 wait_nanos = 9;</code>
+ */
+ public Builder clearWaitNanos() {
+ bitField0_ = (bitField0_ & ~0x00000080);
+ waitNanos_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:exec.shared.OperatorProfile)
}
@@ -16434,38 +16515,39 @@ public final class UserBitShared {
"le\022\022\n\nstart_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003" +
"\022\023\n\013memory_used\030\007 \001(\003\022\027\n\017max_memory_used" +
"\030\010 \001(\003\022(\n\010endpoint\030\t \001(\0132\026.exec.Drillbit" +
- "Endpoint\"\346\001\n\017OperatorProfile\0221\n\rinput_pr" +
+ "Endpoint\"\372\001\n\017OperatorProfile\0221\n\rinput_pr" +
"ofile\030\001 \003(\0132\032.exec.shared.StreamProfile\022" +
"\023\n\013operator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 " +
"\001(\005\022\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nano",
"s\030\006 \001(\003\022\036\n\026local_memory_allocated\030\007 \001(\003\022" +
"(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricValu" +
- "e\"B\n\rStreamProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007b" +
- "atches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricV" +
- "alue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 " +
- "\001(\003\022\024\n\014double_value\030\003 \001(\001*5\n\nRpcChannel\022" +
- "\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002" +
- "*/\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010P" +
- "HYSICAL\020\003*k\n\rFragmentState\022\013\n\007SENDING\020\000\022" +
- "\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n",
- "\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\362" +
- "\004\n\020CoreOperatorType\022\021\n\rSINGLE_SENDER\020\000\022\024" +
- "\n\020BROADCAST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH" +
- "_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOI" +
- "N\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020" +
- "\007\022\024\n\020MERGING_RECEIVER\020\010\022\034\n\030ORDERED_PARTI" +
- "TION_SENDER\020\t\022\013\n\007PROJECT\020\n\022\023\n\017RANDOM_REC" +
- "EIVER\020\013\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n" +
- "\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMING" +
- "_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL",
- "_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SOR" +
- "T\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_" +
- "SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOC" +
- "K_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRE" +
- "CT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_S" +
- "UB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCH" +
- "EMA_SUB_SCAN\020\036\022\013\n\007FLATTEN\020\037B.\n\033org.apach" +
- "e.drill.exec.protoB\rUserBitSharedH\001"
+ "e\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamProfile\022\017" +
+ "\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007sche" +
+ "mas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 " +
+ "\001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double_value\030" +
+ "\003 \001(\001*5\n\nRpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010" +
+ "BIT_DATA\020\001\022\010\n\004USER\020\002*/\n\tQueryType\022\007\n\003SQL" +
+ "\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003*k\n\rFragmen" +
+ "tState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATI",
+ "ON\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCE" +
+ "LLED\020\004\022\n\n\006FAILED\020\005*\362\004\n\020CoreOperatorType\022" +
+ "\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001" +
+ "\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH" +
+ "_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITIO" +
+ "N_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RECEIVE" +
+ "R\020\010\022\034\n\030ORDERED_PARTITION_SENDER\020\t\022\013\n\007PRO" +
+ "JECT\020\n\022\023\n\017RANDOM_RECEIVER\020\013\022\020\n\014RANGE_SEN" +
+ "DER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTION_VECTOR_RE" +
+ "MOVER\020\016\022\027\n\023STREAMING_AGGREGATE\020\017\022\016\n\nTOP_",
+ "N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n\005TRACE\020\022\022\t" +
+ "\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PARQUET_ROW_G" +
+ "ROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026\022\025\n\021SYSTEM" +
+ "_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030\022\022\n\016PARQ" +
+ "UET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013TEX" +
+ "T_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON_SU" +
+ "B_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\013\n\007FL" +
+ "ATTEN\020\037B.\n\033org.apache.drill.exec.protoB\r" +
+ "UserBitSharedH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -16549,7 +16631,7 @@ public final class UserBitShared {
internal_static_exec_shared_OperatorProfile_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_shared_OperatorProfile_descriptor,
- new java.lang.String[] { "InputProfile", "OperatorId", "OperatorType", "SetupNanos", "ProcessNanos", "LocalMemoryAllocated", "Metric", });
+ new java.lang.String[] { "InputProfile", "OperatorId", "OperatorType", "SetupNanos", "ProcessNanos", "LocalMemoryAllocated", "Metric", "WaitNanos", });
internal_static_exec_shared_StreamProfile_descriptor =
getDescriptor().getMessageTypes().get(13);
internal_static_exec_shared_StreamProfile_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java
index b9ed2e4..f1b1acc 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java
@@ -56,6 +56,7 @@ public final class OperatorProfile implements Externalizable, Message<OperatorPr
private long processNanos;
private long localMemoryAllocated;
private List<MetricValue> metric;
+ private long waitNanos;
public OperatorProfile()
{
@@ -155,6 +156,19 @@ public final class OperatorProfile implements Externalizable, Message<OperatorPr
return this;
}
+ // waitNanos
+
+ public long getWaitNanos()
+ {
+ return waitNanos;
+ }
+
+ public OperatorProfile setWaitNanos(long waitNanos)
+ {
+ this.waitNanos = waitNanos;
+ return this;
+ }
+
// java serialization
public void readExternal(ObjectInput in) throws IOException
@@ -236,6 +250,9 @@ public final class OperatorProfile implements Externalizable, Message<OperatorPr
message.metric.add(input.mergeObject(null, MetricValue.getSchema()));
break;
+ case 9:
+ message.waitNanos = input.readInt64();
+ break;
default:
input.handleUnknownField(number, this);
}
@@ -279,6 +296,9 @@ public final class OperatorProfile implements Externalizable, Message<OperatorPr
}
}
+
+ if(message.waitNanos != 0)
+ output.writeInt64(9, message.waitNanos, false);
}
public String getFieldName(int number)
@@ -292,6 +312,7 @@ public final class OperatorProfile implements Externalizable, Message<OperatorPr
case 6: return "processNanos";
case 7: return "localMemoryAllocated";
case 8: return "metric";
+ case 9: return "waitNanos";
default: return null;
}
}
@@ -312,6 +333,7 @@ public final class OperatorProfile implements Externalizable, Message<OperatorPr
__fieldMap.put("processNanos", 6);
__fieldMap.put("localMemoryAllocated", 7);
__fieldMap.put("metric", 8);
+ __fieldMap.put("waitNanos", 9);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index b754ee5..eb56efb 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -142,6 +142,7 @@ message OperatorProfile {
optional int64 process_nanos = 6;
optional int64 local_memory_allocated = 7;
repeated MetricValue metric = 8;
+ optional int64 wait_nanos = 9;
}
message StreamProfile {