You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/04/19 03:03:36 UTC
[1/7] drill git commit: DRILL-2813: Update Hive statistics to use
long instead of int for rowcount per split.
Repository: drill
Updated Branches:
refs/heads/master 238399de5 -> 9ec257efb
DRILL-2813: Update Hive statistics to use long instead of int for rowcount per split.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/cf155464
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/cf155464
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/cf155464
Branch: refs/heads/master
Commit: cf1554641c07359e91a2836d245b0b07438d0cd5
Parents: 238399d
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Apr 16 09:08:00 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 18 09:11:11 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/store/hive/HiveScan.java | 98 ++++++++++----------
1 file changed, 49 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/cf155464/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index b96fda4..92635a8 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -94,10 +94,10 @@ public class HiveScan extends AbstractGroupScan {
private long rowCount = 0;
@JsonCreator
- public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry,
- @JsonProperty("storage-plugin") String storagePluginName,
- @JsonProperty("columns") List<SchemaPath> columns,
- @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
+ public HiveScan(@JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
+ @JsonProperty("storage-plugin") final String storagePluginName,
+ @JsonProperty("columns") final List<SchemaPath> columns,
+ @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
this.hiveReadEntry = hiveReadEntry;
this.storagePluginName = storagePluginName;
this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName);
@@ -106,7 +106,7 @@ public class HiveScan extends AbstractGroupScan {
endpoints = storagePlugin.getContext().getBits();
}
- public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, List<SchemaPath> columns) throws ExecutionSetupException {
+ public HiveScan(final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
this.hiveReadEntry = hiveReadEntry;
this.columns = columns;
this.storagePlugin = storagePlugin;
@@ -115,7 +115,7 @@ public class HiveScan extends AbstractGroupScan {
this.storagePluginName = storagePlugin.getName();
}
- private HiveScan(HiveScan that) {
+ private HiveScan(final HiveScan that) {
this.columns = that.columns;
this.endpoints = that.endpoints;
this.hiveReadEntry = that.hiveReadEntry;
@@ -133,14 +133,14 @@ public class HiveScan extends AbstractGroupScan {
private void getSplits() throws ExecutionSetupException {
try {
- List<Partition> partitions = hiveReadEntry.getPartitions();
- Table table = hiveReadEntry.getTable();
+ final List<Partition> partitions = hiveReadEntry.getPartitions();
+ final Table table = hiveReadEntry.getTable();
if (partitions == null || partitions.size() == 0) {
- Properties properties = MetaStoreUtils.getTableMetadata(table);
+ final Properties properties = MetaStoreUtils.getTableMetadata(table);
splitInput(properties, table.getSd(), null);
} else {
- for (Partition partition : partitions) {
- Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+ for (final Partition partition : partitions) {
+ final Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
splitInput(properties, partition.getSd(), partition);
}
}
@@ -150,27 +150,27 @@ public class HiveScan extends AbstractGroupScan {
}
/* Split the input given in StorageDescriptor */
- private void splitInput(Properties properties, StorageDescriptor sd, Partition partition)
+ private void splitInput(final Properties properties, final StorageDescriptor sd, final Partition partition)
throws ReflectiveOperationException, IOException {
- JobConf job = new JobConf();
- for (Object obj : properties.keySet()) {
+ final JobConf job = new JobConf();
+ for (final Object obj : properties.keySet()) {
job.set((String) obj, (String) properties.get(obj));
}
- for (Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) {
+ for (final Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) {
job.set(entry.getKey(), entry.getValue());
}
InputFormat<?, ?> format = (InputFormat<?, ?>)
Class.forName(sd.getInputFormat()).getConstructor().newInstance();
job.setInputFormat(format.getClass());
- Path path = new Path(sd.getLocation());
- FileSystem fs = path.getFileSystem(job);
+ final Path path = new Path(sd.getLocation());
+ final FileSystem fs = path.getFileSystem(job);
// Use new JobConf that has FS configuration
- JobConf jobWithFsConf = new JobConf(fs.getConf());
+ final JobConf jobWithFsConf = new JobConf(fs.getConf());
if (fs.exists(path)) {
FileInputFormat.addInputPath(jobWithFsConf, path);
format = jobWithFsConf.getInputFormat();
- for (InputSplit split : format.getSplits(jobWithFsConf, 1)) {
+ for (final InputSplit split : format.getSplits(jobWithFsConf, 1)) {
inputSplits.add(split);
partitionMap.put(split, partition);
}
@@ -178,7 +178,7 @@ public class HiveScan extends AbstractGroupScan {
final String numRowsProp = properties.getProperty("numRows");
logger.trace("HiveScan num rows property = {}", numRowsProp);
if (numRowsProp != null) {
- final int numRows = Integer.valueOf(numRowsProp);
+ final long numRows = Long.valueOf(numRowsProp);
// starting from hive-0.13, when no statistics are available, this property is set to -1
// it's important to note that the value returned by hive may not be up to date
if (numRows > 0) {
@@ -188,33 +188,33 @@ public class HiveScan extends AbstractGroupScan {
}
@Override
- public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
+ public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) {
mappings = Lists.newArrayList();
for (int i = 0; i < endpoints.size(); i++) {
mappings.add(new ArrayList<InputSplit>());
}
- int count = endpoints.size();
+ final int count = endpoints.size();
for (int i = 0; i < inputSplits.size(); i++) {
mappings.get(i % count).add(inputSplits.get(i));
}
}
- public static String serializeInputSplit(InputSplit split) throws IOException {
- ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput();
+ public static String serializeInputSplit(final InputSplit split) throws IOException {
+ final ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput();
split.write(byteArrayOutputStream);
- String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
+ final String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
logger.debug("Encoded split string for split {} : {}", split, encoded);
return encoded;
}
@Override
- public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+ public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
try {
- List<InputSplit> splits = mappings.get(minorFragmentId);
+ final List<InputSplit> splits = mappings.get(minorFragmentId);
List<HivePartition> parts = Lists.newArrayList();
- List<String> encodedInputSplits = Lists.newArrayList();
- List<String> splitTypes = Lists.newArrayList();
- for (InputSplit split : splits) {
+ final List<String> encodedInputSplits = Lists.newArrayList();
+ final List<String> splitTypes = Lists.newArrayList();
+ for (final InputSplit split : splits) {
HivePartition partition = null;
if (partitionMap.get(split) != null) {
partition = new HivePartition(partitionMap.get(split));
@@ -226,7 +226,7 @@ public class HiveScan extends AbstractGroupScan {
if (parts.contains(null)) {
parts = null;
}
- HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride);
+ final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride);
return new HiveSubScan(encodedInputSplits, subEntry, splitTypes, columns);
} catch (IOException | ReflectiveOperationException e) {
throw new ExecutionSetupException(e);
@@ -240,22 +240,22 @@ public class HiveScan extends AbstractGroupScan {
@Override
public List<EndpointAffinity> getOperatorAffinity() {
- Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
- for (DrillbitEndpoint endpoint : endpoints) {
+ final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
+ for (final DrillbitEndpoint endpoint : endpoints) {
endpointMap.put(endpoint.getAddress(), endpoint);
logger.debug("endpoing address: {}", endpoint.getAddress());
}
- Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
+ final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
try {
long totalSize = 0;
- for (InputSplit split : inputSplits) {
+ for (final InputSplit split : inputSplits) {
totalSize += Math.max(1, split.getLength());
}
- for (InputSplit split : inputSplits) {
- float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
- for (String loc : split.getLocations()) {
+ for (final InputSplit split : inputSplits) {
+ final float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
+ for (final String loc : split.getLocations()) {
logger.debug("split location: {}", loc);
- DrillbitEndpoint endpoint = endpointMap.get(loc);
+ final DrillbitEndpoint endpoint = endpointMap.get(loc);
if (endpoint != null) {
if (affinityMap.containsKey(endpoint)) {
affinityMap.get(endpoint).addAffinity(affinity);
@@ -265,13 +265,13 @@ public class HiveScan extends AbstractGroupScan {
}
}
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new DrillRuntimeException(e);
}
- for (DrillbitEndpoint ep : affinityMap.keySet()) {
+ for (final DrillbitEndpoint ep : affinityMap.keySet()) {
Preconditions.checkNotNull(ep);
}
- for (EndpointAffinity a : affinityMap.values()) {
+ for (final EndpointAffinity a : affinityMap.values()) {
Preconditions.checkNotNull(a.getEndpoint());
}
return Lists.newArrayList(affinityMap.values());
@@ -281,7 +281,7 @@ public class HiveScan extends AbstractGroupScan {
public ScanStats getScanStats() {
try {
long data =0;
- for (InputSplit split : inputSplits) {
+ for (final InputSplit split : inputSplits) {
data += split.getLength();
}
@@ -292,13 +292,13 @@ public class HiveScan extends AbstractGroupScan {
}
logger.debug("estimated row count = {}, stats row count = {}", estRowCount, rowCount);
return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, data);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new DrillRuntimeException(e);
}
}
@Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
return new HiveScan(this);
}
@@ -316,20 +316,20 @@ public class HiveScan extends AbstractGroupScan {
}
@Override
- public GroupScan clone(List<SchemaPath> columns) {
- HiveScan newScan = new HiveScan(this);
+ public GroupScan clone(final List<SchemaPath> columns) {
+ final HiveScan newScan = new HiveScan(this);
newScan.columns = columns;
return newScan;
}
@Override
- public boolean canPushdownProjects(List<SchemaPath> columns) {
+ public boolean canPushdownProjects(final List<SchemaPath> columns) {
return true;
}
// Return true if the current table is partitioned false otherwise
public boolean supportsPartitionFilterPushdown() {
- List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
+ final List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
if (partitionKeys == null || partitionKeys.size() == 0) {
return false;
}
[3/7] drill git commit: DRILL-2762: Update Fragment state reporting
and error collection
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index 84071c3..b1c3fe0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -36,9 +36,9 @@ public class RootFragmentManager implements FragmentManager{
private final FragmentExecutor runner;
private final FragmentHandle handle;
private volatile boolean cancel = false;
- private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
+ private final List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
- public RootFragmentManager(FragmentHandle handle, IncomingBuffers buffers, FragmentExecutor runner) {
+ public RootFragmentManager(final FragmentHandle handle, final IncomingBuffers buffers, final FragmentExecutor runner) {
super();
this.handle = handle;
this.buffers = buffers;
@@ -46,11 +46,16 @@ public class RootFragmentManager implements FragmentManager{
}
@Override
- public boolean handle(RawFragmentBatch batch) throws FragmentSetupException, IOException {
+ public boolean handle(final RawFragmentBatch batch) throws FragmentSetupException, IOException {
return buffers.batchArrived(batch);
}
@Override
+ public void receivingFragmentFinished(final FragmentHandle handle) {
+ throw new IllegalStateException("The root fragment should not be sending any messages to receiver.");
+ }
+
+ @Override
public FragmentExecutor getRunnable() {
return runner;
}
@@ -75,13 +80,13 @@ public class RootFragmentManager implements FragmentManager{
}
@Override
- public void addConnection(RemoteConnection connection) {
+ public void addConnection(final RemoteConnection connection) {
connections.add(connection);
}
@Override
- public void setAutoRead(boolean autoRead) {
- for (RemoteConnection c : connections) {
+ public void setAutoRead(final boolean autoRead) {
+ for (final RemoteConnection c : connections) {
c.setAutoRead(autoRead);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
index 7155d43..0e888c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
@@ -17,26 +17,26 @@
*/
package org.apache.drill.exec.work.fragment;
-import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
-public class StateTransitionException extends DrillException {
+public class StateTransitionException extends DrillRuntimeException {
public StateTransitionException() {
super();
}
- public StateTransitionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ public StateTransitionException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
- public StateTransitionException(String message, Throwable cause) {
+ public StateTransitionException(final String message, final Throwable cause) {
super(message, cause);
}
- public StateTransitionException(String message) {
+ public StateTransitionException(final String message) {
super(message);
}
- public StateTransitionException(Throwable cause) {
+ public StateTransitionException(final Throwable cause) {
super(cause);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
index 2bba345..424e7fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
@@ -25,6 +25,6 @@ import org.apache.drill.exec.proto.UserBitShared.FragmentState;
* The status handler is responsible for receiving changes in fragment status and propagating them back to the foreman.
*/
public interface StatusReporter {
- void fail(FragmentHandle handle, String message, UserException excep);
+ void fail(FragmentHandle handle, UserException excep);
void stateChanged(FragmentHandle handle, FragmentState newState);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 933417e..2536bbb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -20,8 +20,10 @@ package org.apache.drill.exec.physical.impl;
import java.util.Iterator;
import java.util.List;
+import org.apache.drill.common.DeferredException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContext.ExecutorState;
import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.RecordBatch;
@@ -40,16 +42,42 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
private final RecordBatch incoming;
private final ScreenRoot screenRoot;
- public SimpleRootExec(RootExec e) {
+ public SimpleRootExec(final RootExec e) {
if (e instanceof ScreenRoot) {
incoming = ((ScreenRoot)e).getIncoming();
screenRoot = (ScreenRoot) e;
} else {
throw new UnsupportedOperationException();
}
+ incoming.getContext().setExecutorState(new DummyExecutorState());
+ }
+
+ private class DummyExecutorState implements ExecutorState {
+ final DeferredException ex = new DeferredException();
+
+ @Override
+ public boolean shouldContinue() {
+ return !isFailed();
+ }
+
+ @Override
+ public void fail(final Throwable t) {
+ ex.addThrowable(t);
+ }
+
+ @Override
+ public boolean isFailed() {
+ return ex.getException() != null;
+ }
+
+ @Override
+ public Throwable getFailureCause() {
+ return ex.getException();
+ }
}
+
public FragmentContext getContext() {
return incoming.getContext();
}
@@ -63,8 +91,8 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
}
@SuppressWarnings("unchecked")
- public <T extends ValueVector> T getValueVectorById(SchemaPath path, Class<?> vvClass) {
- TypedFieldId tfid = incoming.getValueVectorId(path);
+ public <T extends ValueVector> T getValueVectorById(final SchemaPath path, final Class<?> vvClass) {
+ final TypedFieldId tfid = incoming.getValueVectorId(path);
return (T) incoming.getValueAccessorById(vvClass, tfid.getFieldIds()).getValueVector();
}
@@ -86,14 +114,14 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
}
@Override
- public void receivingFragmentFinished(FragmentHandle handle) {
+ public void receivingFragmentFinished(final FragmentHandle handle) {
//no op
}
@Override
public Iterator<ValueVector> iterator() {
- List<ValueVector> vv = Lists.newArrayList();
- for (VectorWrapper<?> vw : incoming) {
+ final List<ValueVector> vv = Lists.newArrayList();
+ for (final VectorWrapper<?> vw : incoming) {
vv.add(vw.getValueVector());
}
return vv.iterator();
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
index b2e5740..b8336e9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
@@ -27,10 +27,7 @@ import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.data.AckSender;
-import org.apache.drill.exec.rpc.data.DataRpcConfig;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -84,6 +81,7 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
context = Mockito.mock(FragmentContext.class);
Mockito.when(context.getConfig()).thenReturn(dc);
+ Mockito.when(context.shouldContinue()).thenReturn(true);
rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT);
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
index d9dba6e..eca19a0 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
@@ -2081,6 +2081,10 @@ public final class SchemaUserBitShared
if(message.hasEndpoint())
output.writeObject(9, message.getEndpoint(), org.apache.drill.exec.proto.SchemaCoordinationProtos.DrillbitEndpoint.WRITE, false);
+ if(message.hasLastUpdate())
+ output.writeInt64(10, message.getLastUpdate(), false);
+ if(message.hasLastProgress())
+ output.writeInt64(11, message.getLastProgress(), false);
}
public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile message)
{
@@ -2150,6 +2154,12 @@ public final class SchemaUserBitShared
builder.setEndpoint(input.mergeObject(org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.newBuilder(), org.apache.drill.exec.proto.SchemaCoordinationProtos.DrillbitEndpoint.MERGE));
break;
+ case 10:
+ builder.setLastUpdate(input.readInt64());
+ break;
+ case 11:
+ builder.setLastProgress(input.readInt64());
+ break;
default:
input.handleUnknownField(number, this);
}
@@ -2199,6 +2209,8 @@ public final class SchemaUserBitShared
case 7: return "memoryUsed";
case 8: return "maxMemoryUsed";
case 9: return "endpoint";
+ case 10: return "lastUpdate";
+ case 11: return "lastProgress";
default: return null;
}
}
@@ -2219,6 +2231,8 @@ public final class SchemaUserBitShared
fieldMap.put("memoryUsed", 7);
fieldMap.put("maxMemoryUsed", 8);
fieldMap.put("endpoint", 9);
+ fieldMap.put("lastUpdate", 10);
+ fieldMap.put("lastProgress", 11);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index acd8624..a229450 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -236,6 +236,10 @@ public final class UserBitShared {
* <code>FAILED = 5;</code>
*/
FAILED(5, 5),
+ /**
+ * <code>CANCELLATION_REQUESTED = 6;</code>
+ */
+ CANCELLATION_REQUESTED(6, 6),
;
/**
@@ -262,6 +266,10 @@ public final class UserBitShared {
* <code>FAILED = 5;</code>
*/
public static final int FAILED_VALUE = 5;
+ /**
+ * <code>CANCELLATION_REQUESTED = 6;</code>
+ */
+ public static final int CANCELLATION_REQUESTED_VALUE = 6;
public final int getNumber() { return value; }
@@ -274,6 +282,7 @@ public final class UserBitShared {
case 3: return FINISHED;
case 4: return CANCELLED;
case 5: return FAILED;
+ case 6: return CANCELLATION_REQUESTED;
default: return null;
}
}
@@ -15705,6 +15714,26 @@ public final class UserBitShared {
* <code>optional .exec.DrillbitEndpoint endpoint = 9;</code>
*/
org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpointOrBuilder getEndpointOrBuilder();
+
+ // optional int64 last_update = 10;
+ /**
+ * <code>optional int64 last_update = 10;</code>
+ */
+ boolean hasLastUpdate();
+ /**
+ * <code>optional int64 last_update = 10;</code>
+ */
+ long getLastUpdate();
+
+ // optional int64 last_progress = 11;
+ /**
+ * <code>optional int64 last_progress = 11;</code>
+ */
+ boolean hasLastProgress();
+ /**
+ * <code>optional int64 last_progress = 11;</code>
+ */
+ long getLastProgress();
}
/**
* Protobuf type {@code exec.shared.MinorFragmentProfile}
@@ -15827,6 +15856,16 @@ public final class UserBitShared {
bitField0_ |= 0x00000080;
break;
}
+ case 80: {
+ bitField0_ |= 0x00000100;
+ lastUpdate_ = input.readInt64();
+ break;
+ }
+ case 88: {
+ bitField0_ |= 0x00000200;
+ lastProgress_ = input.readInt64();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -16046,6 +16085,38 @@ public final class UserBitShared {
return endpoint_;
}
+ // optional int64 last_update = 10;
+ public static final int LAST_UPDATE_FIELD_NUMBER = 10;
+ private long lastUpdate_;
+ /**
+ * <code>optional int64 last_update = 10;</code>
+ */
+ public boolean hasLastUpdate() {
+ return ((bitField0_ & 0x00000100) == 0x00000100);
+ }
+ /**
+ * <code>optional int64 last_update = 10;</code>
+ */
+ public long getLastUpdate() {
+ return lastUpdate_;
+ }
+
+ // optional int64 last_progress = 11;
+ public static final int LAST_PROGRESS_FIELD_NUMBER = 11;
+ private long lastProgress_;
+ /**
+ * <code>optional int64 last_progress = 11;</code>
+ */
+ public boolean hasLastProgress() {
+ return ((bitField0_ & 0x00000200) == 0x00000200);
+ }
+ /**
+ * <code>optional int64 last_progress = 11;</code>
+ */
+ public long getLastProgress() {
+ return lastProgress_;
+ }
+
private void initFields() {
state_ = org.apache.drill.exec.proto.UserBitShared.FragmentState.SENDING;
error_ = org.apache.drill.exec.proto.UserBitShared.DrillPBError.getDefaultInstance();
@@ -16056,6 +16127,8 @@ public final class UserBitShared {
memoryUsed_ = 0L;
maxMemoryUsed_ = 0L;
endpoint_ = org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.getDefaultInstance();
+ lastUpdate_ = 0L;
+ lastProgress_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -16096,6 +16169,12 @@ public final class UserBitShared {
if (((bitField0_ & 0x00000080) == 0x00000080)) {
output.writeMessage(9, endpoint_);
}
+ if (((bitField0_ & 0x00000100) == 0x00000100)) {
+ output.writeInt64(10, lastUpdate_);
+ }
+ if (((bitField0_ & 0x00000200) == 0x00000200)) {
+ output.writeInt64(11, lastProgress_);
+ }
getUnknownFields().writeTo(output);
}
@@ -16141,6 +16220,14 @@ public final class UserBitShared {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(9, endpoint_);
}
+ if (((bitField0_ & 0x00000100) == 0x00000100)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(10, lastUpdate_);
+ }
+ if (((bitField0_ & 0x00000200) == 0x00000200)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(11, lastProgress_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -16290,6 +16377,10 @@ public final class UserBitShared {
endpointBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000100);
+ lastUpdate_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000200);
+ lastProgress_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000400);
return this;
}
@@ -16367,6 +16458,14 @@ public final class UserBitShared {
} else {
result.endpoint_ = endpointBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+ to_bitField0_ |= 0x00000100;
+ }
+ result.lastUpdate_ = lastUpdate_;
+ if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+ to_bitField0_ |= 0x00000200;
+ }
+ result.lastProgress_ = lastProgress_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -16433,6 +16532,12 @@ public final class UserBitShared {
if (other.hasEndpoint()) {
mergeEndpoint(other.getEndpoint());
}
+ if (other.hasLastUpdate()) {
+ setLastUpdate(other.getLastUpdate());
+ }
+ if (other.hasLastProgress()) {
+ setLastProgress(other.getLastProgress());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -17135,6 +17240,72 @@ public final class UserBitShared {
return endpointBuilder_;
}
+ // optional int64 last_update = 10;
+ private long lastUpdate_ ;
+ /**
+ * <code>optional int64 last_update = 10;</code>
+ */
+ public boolean hasLastUpdate() {
+ return ((bitField0_ & 0x00000200) == 0x00000200);
+ }
+ /**
+ * <code>optional int64 last_update = 10;</code>
+ */
+ public long getLastUpdate() {
+ return lastUpdate_;
+ }
+ /**
+ * <code>optional int64 last_update = 10;</code>
+ */
+ public Builder setLastUpdate(long value) {
+ bitField0_ |= 0x00000200;
+ lastUpdate_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int64 last_update = 10;</code>
+ */
+ public Builder clearLastUpdate() {
+ bitField0_ = (bitField0_ & ~0x00000200);
+ lastUpdate_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // optional int64 last_progress = 11;
+ private long lastProgress_ ;
+ /**
+ * <code>optional int64 last_progress = 11;</code>
+ */
+ public boolean hasLastProgress() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <code>optional int64 last_progress = 11;</code>
+ */
+ public long getLastProgress() {
+ return lastProgress_;
+ }
+ /**
+ * <code>optional int64 last_progress = 11;</code>
+ */
+ public Builder setLastProgress(long value) {
+ bitField0_ |= 0x00000400;
+ lastProgress_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int64 last_progress = 11;</code>
+ */
+ public Builder clearLastProgress() {
+ bitField0_ = (bitField0_ & ~0x00000400);
+ lastProgress_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:exec.shared.MinorFragmentProfile)
}
@@ -19954,7 +20125,7 @@ public final class UserBitShared {
"jorFragmentProfile\"t\n\024MajorFragmentProfi" +
"le\022\031\n\021major_fragment_id\030\001 \001(\005\022A\n\026minor_f" +
"ragment_profile\030\002 \003(\0132!.exec.shared.Mino" +
- "rFragmentProfile\"\274\002\n\024MinorFragmentProfil" +
+ "rFragmentProfile\"\350\002\n\024MinorFragmentProfil" +
"e\022)\n\005state\030\001 \001(\0162\032.exec.shared.FragmentS" +
"tate\022(\n\005error\030\002 \001(\0132\031.exec.shared.DrillP" +
"BError\022\031\n\021minor_fragment_id\030\003 \001(\005\0226\n\020ope" +
@@ -19962,42 +20133,44 @@ public final class UserBitShared {
"orProfile\022\022\n\nstart_time\030\005 \001(\003\022\020\n\010end_tim" +
"e\030\006 \001(\003\022\023\n\013memory_used\030\007 \001(\003\022\027\n\017max_memo",
"ry_used\030\010 \001(\003\022(\n\010endpoint\030\t \001(\0132\026.exec.D" +
- "rillbitEndpoint\"\377\001\n\017OperatorProfile\0221\n\ri" +
- "nput_profile\030\001 \003(\0132\032.exec.shared.StreamP" +
- "rofile\022\023\n\013operator_id\030\003 \001(\005\022\025\n\roperator_" +
- "type\030\004 \001(\005\022\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rproce" +
- "ss_nanos\030\006 \001(\003\022#\n\033peak_local_memory_allo" +
- "cated\030\007 \001(\003\022(\n\006metric\030\010 \003(\0132\030.exec.share" +
- "d.MetricValue\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStr" +
- "eamProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batches\030\002" +
- " \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\t",
- "metric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014d" +
- "ouble_value\030\003 \001(\001*5\n\nRpcChannel\022\017\n\013BIT_C" +
- "ONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*/\n\tQuer" +
- "yType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020" +
- "\003*k\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAIT" +
- "ING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHE" +
- "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\335\005\n\020CoreO" +
- "peratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADC" +
- "AST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGA" +
- "TE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025H",
- "ASH_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MER" +
- "GING_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SEN" +
- "DER\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER" +
- "\020\013\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELE" +
- "CTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGR" +
- "EGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT" +
- "\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032" +
- "\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_S" +
- "CAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB" +
- "_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SU",
- "B_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SC" +
- "AN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_S" +
- "UB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUC" +
- "ER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WIN" +
- "DOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_" +
- "SCAN\020$B.\n\033org.apache.drill.exec.protoB\rU" +
- "serBitSharedH\001"
+ "rillbitEndpoint\022\023\n\013last_update\030\n \001(\003\022\025\n\r" +
+ "last_progress\030\013 \001(\003\"\377\001\n\017OperatorProfile\022" +
+ "1\n\rinput_profile\030\001 \003(\0132\032.exec.shared.Str" +
+ "eamProfile\022\023\n\013operator_id\030\003 \001(\005\022\025\n\ropera" +
+ "tor_type\030\004 \001(\005\022\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rp" +
+ "rocess_nanos\030\006 \001(\003\022#\n\033peak_local_memory_" +
+ "allocated\030\007 \001(\003\022(\n\006metric\030\010 \003(\0132\030.exec.s" +
+ "hared.MetricValue\022\022\n\nwait_nanos\030\t \001(\003\"B\n" +
+ "\rStreamProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batch",
+ "es\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValue" +
+ "\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003\022" +
+ "\024\n\014double_value\030\003 \001(\001*5\n\nRpcChannel\022\017\n\013B" +
+ "IT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*/\n\t" +
+ "QueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSI" +
+ "CAL\020\003*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023" +
+ "AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FI" +
+ "NISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026C" +
+ "ANCELLATION_REQUESTED\020\006*\335\005\n\020CoreOperator" +
+ "Type\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SEN",
+ "DER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n" +
+ "\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PAR" +
+ "TITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RE" +
+ "CEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDER\020\t\022\013" +
+ "\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022\020\n\014R" +
+ "ANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTION_V" +
+ "ECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGREGATE\020\017" +
+ "\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n\005T" +
+ "RACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PARQU" +
+ "ET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026\022\025",
+ "\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030" +
+ "\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020" +
+ "\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n" +
+ "\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN" +
+ "\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_CONS" +
+ "UMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDOW\020\"\022\024" +
+ "\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020$B" +
+ ".\n\033org.apache.drill.exec.protoB\rUserBitS" +
+ "haredH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -20099,7 +20272,7 @@ public final class UserBitShared {
internal_static_exec_shared_MinorFragmentProfile_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_shared_MinorFragmentProfile_descriptor,
- new java.lang.String[] { "State", "Error", "MinorFragmentId", "OperatorProfile", "StartTime", "EndTime", "MemoryUsed", "MaxMemoryUsed", "Endpoint", });
+ new java.lang.String[] { "State", "Error", "MinorFragmentId", "OperatorProfile", "StartTime", "EndTime", "MemoryUsed", "MaxMemoryUsed", "Endpoint", "LastUpdate", "LastProgress", });
internal_static_exec_shared_OperatorProfile_descriptor =
getDescriptor().getMessageTypes().get(16);
internal_static_exec_shared_OperatorProfile_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentState.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentState.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentState.java
index ba536fc..4d3779b 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentState.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/FragmentState.java
@@ -27,7 +27,8 @@ public enum FragmentState implements com.dyuproject.protostuff.EnumLite<Fragment
RUNNING(2),
FINISHED(3),
CANCELLED(4),
- FAILED(5);
+ FAILED(5),
+ CANCELLATION_REQUESTED(6);
public final int number;
@@ -51,6 +52,7 @@ public enum FragmentState implements com.dyuproject.protostuff.EnumLite<Fragment
case 3: return FINISHED;
case 4: return CANCELLED;
case 5: return FAILED;
+ case 6: return CANCELLATION_REQUESTED;
default: return null;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/protocol/src/main/java/org/apache/drill/exec/proto/beans/MinorFragmentProfile.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/MinorFragmentProfile.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/MinorFragmentProfile.java
index 5cd71f9..fb381a8 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/MinorFragmentProfile.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/MinorFragmentProfile.java
@@ -58,6 +58,8 @@ public final class MinorFragmentProfile implements Externalizable, Message<Minor
private long memoryUsed;
private long maxMemoryUsed;
private DrillbitEndpoint endpoint;
+ private long lastUpdate;
+ private long lastProgress;
public MinorFragmentProfile()
{
@@ -183,6 +185,32 @@ public final class MinorFragmentProfile implements Externalizable, Message<Minor
return this;
}
+ // lastUpdate
+
+ public long getLastUpdate()
+ {
+ return lastUpdate;
+ }
+
+ public MinorFragmentProfile setLastUpdate(long lastUpdate)
+ {
+ this.lastUpdate = lastUpdate;
+ return this;
+ }
+
+ // lastProgress
+
+ public long getLastProgress()
+ {
+ return lastProgress;
+ }
+
+ public MinorFragmentProfile setLastProgress(long lastProgress)
+ {
+ this.lastProgress = lastProgress;
+ return this;
+ }
+
// java serialization
public void readExternal(ObjectInput in) throws IOException
@@ -269,6 +297,12 @@ public final class MinorFragmentProfile implements Externalizable, Message<Minor
message.endpoint = input.mergeObject(message.endpoint, DrillbitEndpoint.getSchema());
break;
+ case 10:
+ message.lastUpdate = input.readInt64();
+ break;
+ case 11:
+ message.lastProgress = input.readInt64();
+ break;
default:
input.handleUnknownField(number, this);
}
@@ -313,6 +347,12 @@ public final class MinorFragmentProfile implements Externalizable, Message<Minor
if(message.endpoint != null)
output.writeObject(9, message.endpoint, DrillbitEndpoint.getSchema(), false);
+
+ if(message.lastUpdate != 0)
+ output.writeInt64(10, message.lastUpdate, false);
+
+ if(message.lastProgress != 0)
+ output.writeInt64(11, message.lastProgress, false);
}
public String getFieldName(int number)
@@ -328,6 +368,8 @@ public final class MinorFragmentProfile implements Externalizable, Message<Minor
case 7: return "memoryUsed";
case 8: return "maxMemoryUsed";
case 9: return "endpoint";
+ case 10: return "lastUpdate";
+ case 11: return "lastProgress";
default: return null;
}
}
@@ -350,6 +392,8 @@ public final class MinorFragmentProfile implements Externalizable, Message<Minor
__fieldMap.put("memoryUsed", 7);
__fieldMap.put("maxMemoryUsed", 8);
__fieldMap.put("endpoint", 9);
+ __fieldMap.put("lastUpdate", 10);
+ __fieldMap.put("lastProgress", 11);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 7383bd2..a17dbc7 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -208,6 +208,8 @@ message MinorFragmentProfile {
optional int64 memory_used = 7;
optional int64 max_memory_used = 8;
optional DrillbitEndpoint endpoint = 9;
+ optional int64 last_update = 10;
+ optional int64 last_progress = 11;
}
message OperatorProfile {
@@ -240,6 +242,7 @@ enum FragmentState {
FINISHED = 3;
CANCELLED = 4;
FAILED = 5;
+ CANCELLATION_REQUESTED = 6;
}
enum CoreOperatorType {
[5/7] drill git commit: DRILL-2762: Update Fragment state reporting
and error collection
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4b317e0..7b9fffb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -17,9 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.project;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.PrintStream;
import java.util.HashMap;
import java.util.List;
@@ -63,18 +61,15 @@ import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.util.BatchPrinter;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarCharVector;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import com.carrotsearch.hppc.IntOpenHashSet;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.sun.codemodel.JExpr;
public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
@@ -85,7 +80,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
private boolean hasRemainder = false;
private int remainderIndex = 0;
private int recordCount;
- private boolean buildingSchema = true;
+ private final boolean buildingSchema = true;
private static final String EMPTY_STRING = "";
private boolean first = true;
@@ -96,7 +91,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
public String prefix = "";
public HashMap<String, Integer> prefixMap = Maps.newHashMap();
public CaseInsensitiveMap outputMap = new CaseInsensitiveMap();
- private CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap();
+ private final CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap();
private void clear() {
isStar = false;
@@ -109,7 +104,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
}
- public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
+ public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
}
@@ -120,7 +115,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
@Override
- protected void killIncoming(boolean sendUpstream) {
+ protected void killIncoming(final boolean sendUpstream) {
super.killIncoming(sendUpstream);
hasRemainder = false;
}
@@ -157,7 +152,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
if (next == IterOutcome.OK_NEW_SCHEMA) {
try {
setupNewSchema();
- } catch (SchemaChangeException e) {
+ } catch (final SchemaChangeException e) {
throw new RuntimeException(e);
}
}
@@ -172,7 +167,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
return IterOutcome.OUT_OF_MEMORY;
}
- int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
+ final int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
if (outputRecords < incomingRecordCount) {
setValueCount(outputRecords);
hasRemainder = true;
@@ -180,7 +175,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
this.recordCount = remainderIndex;
} else {
setValueCount(incomingRecordCount);
- for(VectorWrapper<?> v: incoming) {
+ for(final VectorWrapper<?> v: incoming) {
v.clear();
}
this.recordCount = outputRecords;
@@ -195,12 +190,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
private void handleRemainder() {
- int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
+ final int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
if (!doAlloc()) {
outOfMemory = true;
return;
}
- int projRecords = projector.projectRecords(remainderIndex, remainingRecordCount, 0);
+ final int projRecords = projector.projectRecords(remainderIndex, remainingRecordCount, 0);
if (projRecords < remainingRecordCount) {
setValueCount(projRecords);
this.recordCount = projRecords;
@@ -209,7 +204,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
setValueCount(remainingRecordCount);
hasRemainder = false;
remainderIndex = 0;
- for (VectorWrapper<?> v : incoming) {
+ for (final VectorWrapper<?> v : incoming) {
v.clear();
}
this.recordCount = remainingRecordCount;
@@ -221,13 +216,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
}
- public void addComplexWriter(ComplexWriter writer) {
+ public void addComplexWriter(final ComplexWriter writer) {
complexWriters.add(writer);
}
private boolean doAlloc() {
//Allocate vv in the allocationVectors.
- for (ValueVector v : this.allocationVectors) {
+ for (final ValueVector v : this.allocationVectors) {
AllocationHelper.allocateNew(v, incoming.getRecordCount());
}
@@ -236,16 +231,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
return true;
}
- for (ComplexWriter writer : complexWriters) {
+ for (final ComplexWriter writer : complexWriters) {
writer.allocate();
}
return true;
}
- private void setValueCount(int count) {
- for (ValueVector v : allocationVectors) {
- ValueVector.Mutator m = v.getMutator();
+ private void setValueCount(final int count) {
+ for (final ValueVector v : allocationVectors) {
+ final ValueVector.Mutator m = v.getMutator();
m.setValueCount(count);
}
@@ -253,15 +248,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
return;
}
- for (ComplexWriter writer : complexWriters) {
+ for (final ComplexWriter writer : complexWriters) {
writer.setValueCount(count);
}
}
/** hack to make ref and full work together... need to figure out if this is still necessary. **/
- private FieldReference getRef(NamedExpression e) {
- FieldReference ref = e.getRef();
- PathSegment seg = ref.getRootSegment();
+ private FieldReference getRef(final NamedExpression e) {
+ final FieldReference ref = e.getRef();
+ final PathSegment seg = ref.getRootSegment();
// if (seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())) {
// return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition());
@@ -269,8 +264,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
return ref;
}
- private boolean isAnyWildcard(List<NamedExpression> exprs) {
- for (NamedExpression e : exprs) {
+ private boolean isAnyWildcard(final List<NamedExpression> exprs) {
+ for (final NamedExpression e : exprs) {
if (isWildcard(e)) {
return true;
}
@@ -278,18 +273,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
return false;
}
- private boolean isWildcard(NamedExpression ex) {
+ private boolean isWildcard(final NamedExpression ex) {
if ( !(ex.getExpr() instanceof SchemaPath)) {
return false;
}
- NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
+ final NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
return expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
}
@Override
protected boolean setupNewSchema() throws SchemaChangeException {
if (allocationVectors != null) {
- for (ValueVector v : allocationVectors) {
+ for (final ValueVector v : allocationVectors) {
v.clear();
}
}
@@ -305,12 +300,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- IntOpenHashSet transferFieldIds = new IntOpenHashSet();
+ final IntOpenHashSet transferFieldIds = new IntOpenHashSet();
- boolean isAnyWildcard = isAnyWildcard(exprs);
+ final boolean isAnyWildcard = isAnyWildcard(exprs);
- ClassifierResult result = new ClassifierResult();
- boolean classify = isClassificationNeeded(exprs);
+ final ClassifierResult result = new ClassifierResult();
+ final boolean classify = isClassificationNeeded(exprs);
for (int i = 0; i < exprs.size(); i++) {
final NamedExpression namedExpression = exprs.get(i);
@@ -321,33 +316,33 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
if (result.isStar) {
// The value indicates which wildcard we are processing now
- Integer value = result.prefixMap.get(result.prefix);
+ final Integer value = result.prefixMap.get(result.prefix);
if (value != null && value.intValue() == 1) {
int k = 0;
- for (VectorWrapper<?> wrapper : incoming) {
- ValueVector vvIn = wrapper.getValueVector();
- SchemaPath originalPath = vvIn.getField().getPath();
+ for (final VectorWrapper<?> wrapper : incoming) {
+ final ValueVector vvIn = wrapper.getValueVector();
+ final SchemaPath originalPath = vvIn.getField().getPath();
if (k > result.outputNames.size()-1) {
assert false;
}
- String name = result.outputNames.get(k++); // get the renamed column names
+ final String name = result.outputNames.get(k++); // get the renamed column names
if (name == EMPTY_STRING) {
continue;
}
- FieldReference ref = new FieldReference(name);
- ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType()));
- TransferPair tp = vvIn.makeTransferPair(vvOut);
+ final FieldReference ref = new FieldReference(name);
+ final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType()));
+ final TransferPair tp = vvIn.makeTransferPair(vvOut);
transfers.add(tp);
}
} else if (value != null && value.intValue() > 1) { // subsequent wildcards should do a copy of incoming valuevectors
int k = 0;
- for (VectorWrapper<?> wrapper : incoming) {
- ValueVector vvIn = wrapper.getValueVector();
- SchemaPath originalPath = vvIn.getField().getPath();
+ for (final VectorWrapper<?> wrapper : incoming) {
+ final ValueVector vvIn = wrapper.getValueVector();
+ final SchemaPath originalPath = vvIn.getField().getPath();
if (k > result.outputNames.size()-1) {
assert false;
}
- String name = result.outputNames.get(k++); // get the renamed column names
+ final String name = result.outputNames.get(k++); // get the renamed column names
if (name == EMPTY_STRING) {
continue;
}
@@ -357,12 +352,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
- MaterializedField outputField = MaterializedField.create(name, expr.getMajorType());
- ValueVector vv = container.addOrGet(outputField, callBack);
+ final MaterializedField outputField = MaterializedField.create(name, expr.getMajorType());
+ final ValueVector vv = container.addOrGet(outputField, callBack);
allocationVectors.add(vv);
- TypedFieldId fid = container.getValueVectorId(outputField.getPath());
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
- HoldingContainer hc = cg.addExpr(write);
+ final TypedFieldId fid = container.getValueVectorId(outputField.getPath());
+ final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
+ final HoldingContainer hc = cg.addExpr(write);
}
}
continue;
@@ -371,7 +366,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
// For the columns which do not needed to be classified,
// it is still necessary to ensure the output column name is unique
result.outputNames = Lists.newArrayList();
- String outputName = getRef(namedExpression).getRootSegment().getPath();
+ final String outputName = getRef(namedExpression).getRootSegment().getPath();
addToResultMaps(outputName, result, true);
}
@@ -403,17 +398,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
&& !isAnyWildcard
&& !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) {
- ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
- TypedFieldId id = vectorRead.getFieldId();
- ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+ final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+ final TypedFieldId id = vectorRead.getFieldId();
+ final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
Preconditions.checkNotNull(incoming);
- FieldReference ref = getRef(namedExpression);
- ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType()));
- TransferPair tp = vvIn.makeTransferPair(vvOut);
+ final FieldReference ref = getRef(namedExpression);
+ final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType()));
+ final TransferPair tp = vvIn.makeTransferPair(vvOut);
transfers.add(tp);
transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
- logger.debug("Added transfer for project expression.");
} else if (expr instanceof DrillFuncHolderExpr &&
((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) {
// Need to process ComplexWriter function evaluation.
@@ -429,12 +423,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
cg.addExpr(expr);
} else{
// need to do evaluation.
- ValueVector vector = container.addOrGet(outputField, callBack);
+ final ValueVector vector = container.addOrGet(outputField, callBack);
allocationVectors.add(vector);
- TypedFieldId fid = container.getValueVectorId(outputField.getPath());
- boolean useSetSafe = !(vector instanceof FixedWidthVector);
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
- HoldingContainer hc = cg.addExpr(write);
+ final TypedFieldId fid = container.getValueVectorId(outputField.getPath());
+ final boolean useSetSafe = !(vector instanceof FixedWidthVector);
+ final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
+ final HoldingContainer hc = cg.addExpr(write);
logger.debug("Added eval for project expression.");
}
@@ -459,12 +453,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
return popConfig.getExprs();
}
- List<NamedExpression> exprs = Lists.newArrayList();
- for (MaterializedField field : incoming.getSchema()) {
+ final List<NamedExpression> exprs = Lists.newArrayList();
+ for (final MaterializedField field : incoming.getSchema()) {
if (Types.isComplex(field.getType()) || Types.isRepeated(field.getType())) {
- LogicalExpression convertToJson = FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON", field.getPath(), ExpressionPosition.UNKNOWN);
- String castFuncName = CastFunctions.getCastFunc(MinorType.VARCHAR);
- List<LogicalExpression> castArgs = Lists.newArrayList();
+ final LogicalExpression convertToJson = FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON", field.getPath(), ExpressionPosition.UNKNOWN);
+ final String castFuncName = CastFunctions.getCastFunc(MinorType.VARCHAR);
+ final List<LogicalExpression> castArgs = Lists.newArrayList();
castArgs.add(convertToJson); //input_expr
/*
* We are implicitly casting to VARCHAR so we don't have a max length,
@@ -472,7 +466,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
* to the actual size so this size doesn't really matter.
*/
castArgs.add(new ValueExpressions.LongExpression(TypeHelper.VARCHAR_DEFAULT_CAST_LEN, null)); //
- FunctionCall castCall = new FunctionCall(castFuncName, castArgs, ExpressionPosition.UNKNOWN);
+ final FunctionCall castCall = new FunctionCall(castFuncName, castArgs, ExpressionPosition.UNKNOWN);
exprs.add(new NamedExpression(castCall, new FieldReference(field.getPath())));
} else {
exprs.add(new NamedExpression(field.getPath(), new FieldReference(field.getPath())));
@@ -481,17 +475,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
return exprs;
}
- private boolean isClassificationNeeded(List<NamedExpression> exprs) {
+ private boolean isClassificationNeeded(final List<NamedExpression> exprs) {
boolean needed = false;
for (int i = 0; i < exprs.size(); i++) {
final NamedExpression ex = exprs.get(i);
if (!(ex.getExpr() instanceof SchemaPath)) {
continue;
}
- NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
- NameSegment ref = ex.getRef().getRootSegment();
- boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
- boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
+ final NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
+ final NameSegment ref = ex.getRef().getRootSegment();
+ final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+ final boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
if (refHasPrefix || exprContainsStar) {
needed = true;
@@ -501,16 +495,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
return needed;
}
- private String getUniqueName(String name, ClassifierResult result) {
- Integer currentSeq = (Integer) result.sequenceMap.get(name);
+ private String getUniqueName(final String name, final ClassifierResult result) {
+ final Integer currentSeq = (Integer) result.sequenceMap.get(name);
if (currentSeq == null) { // name is unique, so return the original name
- Integer n = -1;
+ final Integer n = -1;
result.sequenceMap.put(name, n);
return name;
}
// create a new name
- Integer newSeq = currentSeq + 1;
- String newName = name + newSeq;
+ final Integer newSeq = currentSeq + 1;
+ final String newName = name + newSeq;
result.sequenceMap.put(name, newSeq);
result.sequenceMap.put(newName, -1);
@@ -527,7 +521,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
* to ensure uniqueness
* @Param allowDupsWithRename if the original name has been used, is renaming allowed to ensure output name unique
*/
- private void addToResultMaps(String origName, ClassifierResult result, boolean allowDupsWithRename) {
+ private void addToResultMaps(final String origName, final ClassifierResult result, final boolean allowDupsWithRename) {
String name = origName;
if (allowDupsWithRename) {
name = getUniqueName(origName, result);
@@ -540,22 +534,22 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
}
- private void classifyExpr(NamedExpression ex, RecordBatch incoming, ClassifierResult result) {
- NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
- NameSegment ref = ex.getRef().getRootSegment();
- boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
- boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
- boolean exprIsStar = expr.getPath().equals(StarColumnHelper.STAR_COLUMN);
- boolean refContainsStar = ref.getPath().contains(StarColumnHelper.STAR_COLUMN);
- boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
- boolean refEndsWithStar = ref.getPath().endsWith(StarColumnHelper.STAR_COLUMN);
+ private void classifyExpr(final NamedExpression ex, final RecordBatch incoming, final ClassifierResult result) {
+ final NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
+ final NameSegment ref = ex.getRef().getRootSegment();
+ final boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+ final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+ final boolean exprIsStar = expr.getPath().equals(StarColumnHelper.STAR_COLUMN);
+ final boolean refContainsStar = ref.getPath().contains(StarColumnHelper.STAR_COLUMN);
+ final boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
+ final boolean refEndsWithStar = ref.getPath().endsWith(StarColumnHelper.STAR_COLUMN);
String exprPrefix = EMPTY_STRING;
String exprSuffix = expr.getPath();
if (exprHasPrefix) {
// get the prefix of the expr
- String[] exprComponents = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
+ final String[] exprComponents = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
assert(exprComponents.length == 2);
exprPrefix = exprComponents[0];
exprSuffix = exprComponents[1];
@@ -565,18 +559,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
boolean exprIsFirstWildcard = false;
if (exprContainsStar) {
result.isStar = true;
- Integer value = (Integer) result.prefixMap.get(exprPrefix);
+ final Integer value = (Integer) result.prefixMap.get(exprPrefix);
if (value == null) {
- Integer n = 1;
+ final Integer n = 1;
result.prefixMap.put(exprPrefix, n);
exprIsFirstWildcard = true;
} else {
- Integer n = value + 1;
+ final Integer n = value + 1;
result.prefixMap.put(exprPrefix, n);
}
}
- int incomingSchemaSize = incoming.getSchema().getFieldCount();
+ final int incomingSchemaSize = incoming.getSchema().getFieldCount();
// for debugging..
// if (incomingSchemaSize > 9) {
@@ -585,16 +579,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
// input is '*' and output is 'prefix_*'
if (exprIsStar && refHasPrefix && refEndsWithStar) {
- String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
+ final String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
assert(components.length == 2);
- String prefix = components[0];
+ final String prefix = components[0];
result.outputNames = Lists.newArrayList();
- for(VectorWrapper<?> wrapper : incoming) {
- ValueVector vvIn = wrapper.getValueVector();
- String name = vvIn.getField().getPath().getRootSegment().getPath();
+ for(final VectorWrapper<?> wrapper : incoming) {
+ final ValueVector vvIn = wrapper.getValueVector();
+ final String name = vvIn.getField().getPath().getRootSegment().getPath();
// add the prefix to the incoming column name
- String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name;
+ final String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name;
addToResultMaps(newName, result, false);
}
}
@@ -609,19 +603,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
result.outputNames.add(EMPTY_STRING); // initialize
}
- for (VectorWrapper<?> wrapper : incoming) {
- ValueVector vvIn = wrapper.getValueVector();
- String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+ for (final VectorWrapper<?> wrapper : incoming) {
+ final ValueVector vvIn = wrapper.getValueVector();
+ final String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
// get the prefix of the name
- String[] nameComponents = incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2);
+ final String[] nameComponents = incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2);
// if incoming valuevector does not have a prefix, ignore it since this expression is not referencing it
if (nameComponents.length <= 1) {
k++;
continue;
}
- String namePrefix = nameComponents[0];
+ final String namePrefix = nameComponents[0];
if (exprPrefix.equals(namePrefix)) {
- String newName = incomingName;
+ final String newName = incomingName;
if (!result.outputMap.containsKey(newName)) {
result.outputNames.set(k, newName);
result.outputMap.put(newName, newName);
@@ -632,9 +626,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
} else {
result.outputNames = Lists.newArrayList();
if (exprContainsStar) {
- for (VectorWrapper<?> wrapper : incoming) {
- ValueVector vvIn = wrapper.getValueVector();
- String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+ for (final VectorWrapper<?> wrapper : incoming) {
+ final ValueVector vvIn = wrapper.getValueVector();
+ final String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
if (refContainsStar) {
addToResultMaps(incomingName, result, true); // allow dups since this is likely top-level project
} else {
@@ -642,7 +636,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
}
} else {
- String newName = expr.getPath();
+ final String newName = expr.getPath();
if (!refHasPrefix && !exprHasPrefix) {
addToResultMaps(newName, result, true); // allow dups since this is likely top-level project
} else {
@@ -655,9 +649,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
// input is wildcard and it is not the first wildcard
else if(exprIsStar) {
result.outputNames = Lists.newArrayList();
- for (VectorWrapper<?> wrapper : incoming) {
- ValueVector vvIn = wrapper.getValueVector();
- String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+ for (final VectorWrapper<?> wrapper : incoming) {
+ final ValueVector vvIn = wrapper.getValueVector();
+ final String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
addToResultMaps(incomingName, result, true); // allow dups since this is likely top-level project
}
}
@@ -665,7 +659,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
// only the output has prefix
else if (!exprHasPrefix && refHasPrefix) {
result.outputNames = Lists.newArrayList();
- String newName = ref.getPath();
+ final String newName = ref.getPath();
addToResultMaps(newName, result, false);
}
// input has prefix but output does not
@@ -676,24 +670,24 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
result.outputNames.add(EMPTY_STRING); // initialize
}
- for (VectorWrapper<?> wrapper : incoming) {
- ValueVector vvIn = wrapper.getValueVector();
- String name = vvIn.getField().getPath().getRootSegment().getPath();
- String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2);
+ for (final VectorWrapper<?> wrapper : incoming) {
+ final ValueVector vvIn = wrapper.getValueVector();
+ final String name = vvIn.getField().getPath().getRootSegment().getPath();
+ final String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2);
if (components.length <= 1) {
k++;
continue;
}
- String namePrefix = components[0];
- String nameSuffix = components[1];
+ final String namePrefix = components[0];
+ final String nameSuffix = components[1];
if (exprPrefix.equals(namePrefix)) {
if (refContainsStar) {
// remove the prefix from the incoming column names
- String newName = getUniqueName(nameSuffix, result); // for top level we need to make names unique
+ final String newName = getUniqueName(nameSuffix, result); // for top level we need to make names unique
result.outputNames.set(k, newName);
} else if (exprSuffix.equals(nameSuffix)) {
// example: ref: $f1, expr: T0<PREFIX><column_name>
- String newName = ref.getPath();
+ final String newName = ref.getPath();
result.outputNames.set(k, newName);
}
} else {
@@ -704,7 +698,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
// input and output have prefixes although they could be different...
else if (exprHasPrefix && refHasPrefix) {
- String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
+ final String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
assert(input.length == 2);
assert false : "Unexpected project expression or reference"; // not handled yet
}
@@ -713,11 +707,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
// then we just want to pick the ref name as the output column name
result.outputNames = Lists.newArrayList();
- for (VectorWrapper<?> wrapper : incoming) {
- ValueVector vvIn = wrapper.getValueVector();
- String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+ for (final VectorWrapper<?> wrapper : incoming) {
+ final ValueVector vvIn = wrapper.getValueVector();
+ final String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
if (expr.getPath().equals(incomingName)) {
- String newName = ref.getPath();
+ final String newName = ref.getPath();
addToResultMaps(newName, result, true);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 389d668..094865e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -27,9 +27,9 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OpProfileDef;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
-import org.apache.drill.exec.ops.OpProfileDef;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.UnorderedReceiver;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -53,13 +53,13 @@ import org.apache.drill.exec.rpc.RpcOutcomeListener;
public class UnorderedReceiverBatch implements RecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
- private RecordBatchLoader batchLoader;
- private RawFragmentBatchProvider fragProvider;
- private FragmentContext context;
+ private final RecordBatchLoader batchLoader;
+ private final RawFragmentBatchProvider fragProvider;
+ private final FragmentContext context;
private BatchSchema schema;
- private OperatorStats stats;
+ private final OperatorStats stats;
private boolean first = true;
- private UnorderedReceiver config;
+ private final UnorderedReceiver config;
OperatorContext oContext;
public enum Metric implements MetricDef {
@@ -72,7 +72,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
}
}
- public UnorderedReceiverBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, UnorderedReceiver config) throws OutOfMemoryException {
+ public UnorderedReceiverBatch(final FragmentContext context, final RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws OutOfMemoryException {
this.fragProvider = fragProvider;
this.context = context;
// In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
@@ -101,7 +101,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
}
@Override
- public void kill(boolean sendUpstream) {
+ public void kill(final boolean sendUpstream) {
if (sendUpstream) {
informSenders();
}
@@ -124,12 +124,12 @@ public class UnorderedReceiverBatch implements RecordBatch {
}
@Override
- public TypedFieldId getValueVectorId(SchemaPath path) {
+ public TypedFieldId getValueVectorId(final SchemaPath path) {
return batchLoader.getValueVectorId(path);
}
@Override
- public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
return batchLoader.getValueAccessorById(clazz, ids);
}
@@ -154,7 +154,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
if (batch == null) {
batchLoader.clear();
- if (context.isCancelled()) {
+ if (!context.shouldContinue()) {
return IterOutcome.STOP;
}
return IterOutcome.NONE;
@@ -167,8 +167,8 @@ public class UnorderedReceiverBatch implements RecordBatch {
// logger.debug("Next received batch {}", batch);
- RecordBatchDef rbd = batch.getHeader().getDef();
- boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
+ final RecordBatchDef rbd = batch.getHeader().getDef();
+ final boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount());
batch.release();
@@ -206,15 +206,16 @@ public class UnorderedReceiverBatch implements RecordBatch {
}
private void informSenders() {
- FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+ logger.info("Informing senders of request to terminate sending.");
+ final FragmentHandle handlePrototype = FragmentHandle.newBuilder()
.setMajorFragmentId(config.getOppositeMajorFragmentId())
.setQueryId(context.getHandle().getQueryId())
.build();
- for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
- FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+ for (final MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
+ final FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
.setMinorFragmentId(providingEndpoint.getId())
.build();
- FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+ final FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
.setReceiver(context.getHandle())
.setSender(sender)
.build();
@@ -225,12 +226,12 @@ public class UnorderedReceiverBatch implements RecordBatch {
private class OutcomeListener implements RpcOutcomeListener<Ack> {
@Override
- public void failed(RpcException ex) {
+ public void failed(final RpcException ex) {
logger.warn("Failed to inform upstream that receiver is finished");
}
@Override
- public void success(Ack value, ByteBuf buffer) {
+ public void success(final Ack value, final ByteBuf buffer) {
// Do nothing
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index bd3c4e7..95d062c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -347,6 +347,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
mSorter.setup(context, oContext.getAllocator(), getSelectionVector4(), this.container);
mSorter.sort(this.container);
+ // sort may have prematurely exited due to should continue returning false.
+ if (!context.shouldContinue()) {
+ return IterOutcome.STOP;
+ }
sv4 = mSorter.getSV4();
long t = watch.elapsed(TimeUnit.MICROSECONDS);
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 94bc3a3..9b97e1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -41,19 +41,20 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
private long compares;
private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
private Queue<Integer> newRunStarts;
-
+ private FragmentContext context;
@Override
- public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{
+ public void setup(final FragmentContext context, final BufferAllocator allocator, final SelectionVector4 vector4, final VectorContainer hyperBatch) throws SchemaChangeException{
// we pass in the local hyperBatch since that is where we'll be reading data.
Preconditions.checkNotNull(vector4);
this.vector4 = vector4.createNewWrapperCurrent();
+ this.context = context;
vector4.clear();
doSetup(context, hyperBatch, null);
runStarts.add(0);
int batch = 0;
for (int i = 0; i < this.vector4.getTotalCount(); i++) {
- int newBatch = this.vector4.get(i) >>> 16;
+ final int newBatch = this.vector4.get(i) >>> 16;
if (newBatch == batch) {
continue;
} else if(newBatch == batch + 1) {
@@ -63,7 +64,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
throw new UnsupportedOperationException("Missing batch");
}
}
- BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
+ final BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
preAlloc.preAllocate(4 * this.vector4.getTotalCount());
aux = new SelectionVector4(preAlloc.getAllocation(), this.vector4.getTotalCount(), Character.MAX_VALUE);
}
@@ -75,12 +76,12 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
* @param recordCount
* @return
*/
- public static long memoryNeeded(int recordCount) {
+ public static long memoryNeeded(final int recordCount) {
// We need 4 bytes (SV4) for each record.
return recordCount * 4;
}
- private int merge(int leftStart, int rightStart, int rightEnd, int outStart) {
+ private int merge(final int leftStart, final int rightStart, final int rightEnd, final int outStart) {
int l = leftStart;
int r = rightStart;
int o = outStart;
@@ -107,17 +108,23 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
}
@Override
- public void sort(VectorContainer container) {
- Stopwatch watch = new Stopwatch();
+ public void sort(final VectorContainer container) {
+ final Stopwatch watch = new Stopwatch();
watch.start();
while (runStarts.size() > 1) {
+
+ // check if we're cancelled/failed frequently
+ if (!context.shouldContinue()) {
+ return;
+ }
+
int outIndex = 0;
newRunStarts = Queues.newLinkedBlockingQueue();
newRunStarts.add(outIndex);
- int size = runStarts.size();
+ final int size = runStarts.size();
for (int i = 0; i < size / 2; i++) {
- int left = runStarts.poll();
- int right = runStarts.poll();
+ final int left = runStarts.poll();
+ final int right = runStarts.poll();
Integer end = runStarts.peek();
if (end == null) {
end = vector4.getTotalCount();
@@ -130,7 +137,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
if (outIndex < vector4.getTotalCount()) {
copyRun(outIndex, vector4.getTotalCount());
}
- SelectionVector4 tmp = aux.createNewWrapperCurrent();
+ final SelectionVector4 tmp = aux.createNewWrapperCurrent();
aux.clear();
aux = this.vector4.createNewWrapperCurrent();
vector4.clear();
@@ -141,23 +148,23 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
aux.clear();
}
- private void copyRun(int start, int end) {
+ private void copyRun(final int start, final int end) {
for (int i = start; i < end; i++) {
aux.set(i, vector4.get(i));
}
}
@Override
- public void swap(int sv0, int sv1) {
- int tmp = vector4.get(sv0);
+ public void swap(final int sv0, final int sv1) {
+ final int tmp = vector4.get(sv0);
vector4.set(sv0, vector4.get(sv1));
vector4.set(sv1, tmp);
}
@Override
- public int compare(int leftIndex, int rightIndex) {
- int sv1 = vector4.get(leftIndex);
- int sv2 = vector4.get(rightIndex);
+ public int compare(final int leftIndex, final int rightIndex) {
+ final int sv1 = vector4.get(leftIndex);
+ final int sv2 = vector4.get(rightIndex);
compares++;
return doEval(sv1, sv2);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
index 1aadaa2..5e0cd82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.proto.helper;
-import java.util.Arrays;
import java.util.List;
import java.util.UUID;
@@ -29,22 +28,32 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
public class QueryIdHelper {
/* Generate a UUID from the two parts of the queryid */
- public static String getQueryId(QueryId queryId) {
+ public static String getQueryId(final QueryId queryId) {
return (new UUID(queryId.getPart1(), queryId.getPart2())).toString();
}
- public static QueryId getQueryIdFromString(String queryId) {
- UUID uuid = UUID.fromString(queryId);
+ public static QueryId getQueryIdFromString(final String queryId) {
+ final UUID uuid = UUID.fromString(queryId);
return QueryId.newBuilder().setPart1(uuid.getMostSignificantBits()).setPart2(uuid.getLeastSignificantBits()).build();
}
- public static String getQueryIdentifier(FragmentHandle h) {
+ public static String getQueryIdentifier(final FragmentHandle h) {
return getQueryId(h.getQueryId()) + ":" + h.getMajorFragmentId() + ":" + h.getMinorFragmentId();
}
- public static String getQueryIdentifiers(QueryId queryId, int majorFragmentId, List<Integer> minorFragmentIds) {
- String fragmentIds = minorFragmentIds.size() == 1 ? minorFragmentIds.get(0).toString() : minorFragmentIds.toString();
+ public static String getExecutorThreadName(final FragmentHandle fragmentHandle) {
+ return String.format("%s:frag:%s:%s",
+ getQueryId(fragmentHandle.getQueryId()),
+ fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
+ }
+
+ public static String getQueryIdentifiers(final QueryId queryId, final int majorFragmentId, final List<Integer> minorFragmentIds) {
+ final String fragmentIds = minorFragmentIds.size() == 1 ? minorFragmentIds.get(0).toString() : minorFragmentIds.toString();
return getQueryId(queryId) + ":" + majorFragmentId + ":" + fragmentIds;
}
+ public static String getFragmentId(final FragmentHandle fragmentHandle) {
+ return fragmentHandle.getMajorFragmentId() + ":" + fragmentHandle.getMinorFragmentId();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 2bb29e5..215f580 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -41,15 +41,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
protected BatchState state;
- protected AbstractRecordBatch(T popConfig, FragmentContext context) throws OutOfMemoryException {
+ protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException {
this(popConfig, context, true, new OperatorContext(popConfig, context, true));
}
- protected AbstractRecordBatch(T popConfig, FragmentContext context, boolean buildSchema) throws OutOfMemoryException {
+ protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema) throws OutOfMemoryException {
this(popConfig, context, buildSchema, new OperatorContext(popConfig, context, true));
}
- protected AbstractRecordBatch(T popConfig, FragmentContext context, boolean buildSchema, OperatorContext oContext) throws OutOfMemoryException {
+ protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, final OperatorContext oContext) throws OutOfMemoryException {
super();
this.context = context;
this.popConfig = popConfig;
@@ -84,16 +84,18 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
return popConfig;
}
- public final IterOutcome next(RecordBatch b) {
-
+ public final IterOutcome next(final RecordBatch b) {
+ if(!context.shouldContinue()) {
+ return IterOutcome.STOP;
+ }
return next(0, b);
}
- public final IterOutcome next(int inputIndex, RecordBatch b){
+ public final IterOutcome next(final int inputIndex, final RecordBatch b){
IterOutcome next = null;
stats.stopProcessing();
try{
- if (context.isCancelled()) {
+ if (!context.shouldContinue()) {
return IterOutcome.STOP;
}
next = b.next();
@@ -141,7 +143,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
default:
return innerNext();
}
- } catch (SchemaChangeException e) {
+ } catch (final SchemaChangeException e) {
throw new DrillRuntimeException(e);
} finally {
stats.stopProcessing();
@@ -159,7 +161,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
@Override
- public void kill(boolean sendUpstream) {
+ public void kill(final boolean sendUpstream) {
killIncoming(sendUpstream);
}
@@ -182,12 +184,12 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
@Override
- public TypedFieldId getValueVectorId(SchemaPath path) {
+ public TypedFieldId getValueVectorId(final SchemaPath path) {
return container.getValueVectorId(path);
}
@Override
- public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
return container.getValueAccessorById(clazz, ids);
}
@@ -195,7 +197,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
@Override
public WritableBatch getWritableBatch() {
// logger.debug("Getting writable batch.");
- WritableBatch batch = WritableBatch.get(this);
+ final WritableBatch batch = WritableBatch.get(this);
return batch;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index 7d157fe..4f99081 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -19,15 +19,9 @@ package org.apache.drill.exec.record;
import io.netty.buffer.ByteBuf;
-import java.util.List;
-
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.proto.UserBitShared.SerializedField;
-
-import com.google.common.collect.Lists;
public class FragmentWritableBatch{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWritableBatch.class);
@@ -37,26 +31,25 @@ public class FragmentWritableBatch{
private final ByteBuf[] buffers;
private final FragmentRecordBatch header;
- public FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, WritableBatch batch){
+ public FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId, final WritableBatch batch){
this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, new int[]{receiveMinorFragmentId}, batch.getDef(), batch.getBuffers());
}
- public FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int[] receiveMinorFragmentIds, WritableBatch batch){
+ public FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentIds, final WritableBatch batch){
this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, batch.getDef(), batch.getBuffers());
}
- private FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int[] receiveMinorFragmentId, RecordBatchDef def, ByteBuf... buffers){
+ private FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentId, final RecordBatchDef def, final ByteBuf... buffers){
this.buffers = buffers;
- FragmentRecordBatch.Builder builder = FragmentRecordBatch //
- .newBuilder() //
- .setIsLastBatch(isLast) //
- .setDef(def) //
+ final FragmentRecordBatch.Builder builder = FragmentRecordBatch.newBuilder()
+ .setIsLastBatch(isLast)
+ .setDef(def)
.setQueryId(queryId)
- .setReceivingMajorFragmentId(receiveMajorFragmentId) //
- .setSendingMajorFragmentId(sendMajorFragmentId) //
+ .setReceivingMajorFragmentId(receiveMajorFragmentId)
+ .setSendingMajorFragmentId(sendMajorFragmentId)
.setSendingMinorFragmentId(sendMinorFragmentId);
- for(int i : receiveMinorFragmentId){
+ for(final int i : receiveMinorFragmentId){
builder.addReceivingMinorFragmentId(i);
}
@@ -64,31 +57,32 @@ public class FragmentWritableBatch{
}
- public static FragmentWritableBatch getEmptyLast(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId){
+ public static FragmentWritableBatch getEmptyLast(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId){
return getEmptyLast(queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, new int[]{receiveMinorFragmentId});
}
- public static FragmentWritableBatch getEmptyLast(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int[] receiveMinorFragmentIds){
+ public static FragmentWritableBatch getEmptyLast(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentIds){
return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, EMPTY_DEF);
}
- public static FragmentWritableBatch getEmptyLastWithSchema(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId,
- int receiveMajorFragmentId, int receiveMinorFragmentId, BatchSchema schema){
+ public static FragmentWritableBatch getEmptyLastWithSchema(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId,
+ final int receiveMajorFragmentId, final int receiveMinorFragmentId, final BatchSchema schema){
return getEmptyBatchWithSchema(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
receiveMinorFragmentId, schema);
}
- public static FragmentWritableBatch getEmptyBatchWithSchema(boolean isLast, QueryId queryId, int sendMajorFragmentId,
- int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, BatchSchema schema){
+ public static FragmentWritableBatch getEmptyBatchWithSchema(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId,
+ final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId, final BatchSchema schema){
- List<SerializedField> fields = Lists.newArrayList();
- for (MaterializedField field : schema) {
- fields.add(field.getSerializedField());
+ final RecordBatchDef.Builder def = RecordBatchDef.newBuilder();
+ if (schema != null) {
+ for (final MaterializedField field : schema) {
+ def.addField(field.getSerializedField());
+ }
}
- RecordBatchDef def = RecordBatchDef.newBuilder().addAllField(fields).build();
return new FragmentWritableBatch(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
- new int[]{receiveMinorFragmentId}, def);
+ new int[] { receiveMinorFragmentId }, def.build());
}
public ByteBuf[] getBuffers(){
@@ -97,7 +91,7 @@ public class FragmentWritableBatch{
public long getByteCount() {
long n = 0;
- for (ByteBuf buf : buffers) {
+ for (final ByteBuf buf : buffers) {
n += buf.readableBytes();
}
return n;
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
index 33e0665..e18b94c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
@@ -19,14 +19,14 @@ package org.apache.drill.exec.rpc.data;
import io.netty.buffer.DrillBuf;
+import java.io.IOException;
+
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.fragment.FragmentManager;
-import java.io.IOException;
-
public class DataResponseHandlerImpl implements DataResponseHandler{
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataResponseHandlerImpl.class);
private final WorkerBee bee;
@@ -45,7 +45,7 @@ public class DataResponseHandlerImpl implements DataResponseHandler{
final DrillBuf data, final AckSender sender) throws FragmentSetupException, IOException {
// logger.debug("Fragment Batch received {}", fragmentBatch);
- boolean canRun = manager.handle(new RawFragmentBatch(fragmentBatch, data, sender));
+ final boolean canRun = manager.handle(new RawFragmentBatch(fragmentBatch, data, sender));
if (canRun) {
// logger.debug("Arriving batch means local batch can run, starting local batch.");
/*
@@ -54,10 +54,5 @@ public class DataResponseHandlerImpl implements DataResponseHandler{
*/
bee.startFragmentPendingRemote(manager);
}
- if (fragmentBatch.getIsLastBatch() && !manager.isWaiting()) {
-// logger.debug("Removing handler. Is Last Batch {}. Is Waiting for more {}", fragmentBatch.getIsLastBatch(),
-// manager.isWaiting());
- bee.getContext().getWorkBus().removeFragmentManager(manager.getHandle());
- }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index c15bb7c..e7a9a3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -76,13 +76,13 @@ public class Drillbit implements AutoCloseable {
Drillbit bit;
try {
bit = new Drillbit(config, remoteServiceSet);
- } catch (Exception ex) {
+ } catch (final Exception ex) {
throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
}
try {
bit.run();
- } catch (Exception e) {
+ } catch (final Exception e) {
bit.close();
throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
}
@@ -131,7 +131,7 @@ public class Drillbit implements AutoCloseable {
// parse out the properties, validate, and then set them
final String systemProps[] = allSystemProps.split(",");
- for(String systemProp : systemProps) {
+ for(final String systemProp : systemProps) {
final String keyValue[] = systemProp.split("=");
if (keyValue.length != 2) {
throwInvalidSystemOption(systemProp, "does not contain a key=value assignment");
@@ -162,7 +162,7 @@ public class Drillbit implements AutoCloseable {
}
public static void main(final String[] cli) throws DrillbitStartupException {
- StartupOptions options = StartupOptions.parse(cli);
+ final StartupOptions options = StartupOptions.parse(cli);
start(options);
}
@@ -174,7 +174,7 @@ public class Drillbit implements AutoCloseable {
private final Server embeddedJetty;
private RegistrationHandle registrationHandle;
- public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
+ public Drillbit(final DrillConfig config, final RemoteServiceSet serviceSet) throws Exception {
final long startTime = System.currentTimeMillis();
logger.debug("Construction started.");
final boolean allowPortHunting = serviceSet != null;
@@ -269,14 +269,14 @@ public class Drillbit implements AutoCloseable {
try {
Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.warn("Interrupted while sleeping during coordination deregistration.");
}
if (embeddedJetty != null) {
try {
embeddedJetty.stop();
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.warn("Failure while shutting down embedded jetty server.");
}
}
@@ -323,7 +323,7 @@ public class Drillbit implements AutoCloseable {
logger.info("Received shutdown request.");
try {
drillbit.close();
- } catch(Exception e) {
+ } catch(final Exception e) {
throw new RuntimeException("Caught exception closing Drillbit started from\n" + stackTrace, e);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
index e9024ff..20f76a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
@@ -26,67 +26,79 @@ import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
interface Comparators {
final static Comparator<MajorFragmentProfile> majorIdCompare = new Comparator<MajorFragmentProfile>() {
- public int compare(MajorFragmentProfile o1, MajorFragmentProfile o2) {
+ public int compare(final MajorFragmentProfile o1, final MajorFragmentProfile o2) {
return Long.compare(o1.getMajorFragmentId(), o2.getMajorFragmentId());
}
};
final static Comparator<MinorFragmentProfile> minorIdCompare = new Comparator<MinorFragmentProfile>() {
- public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+ public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
return Long.compare(o1.getMinorFragmentId(), o2.getMinorFragmentId());
}
};
final static Comparator<MinorFragmentProfile> startTimeCompare = new Comparator<MinorFragmentProfile>() {
- public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+ public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
return Long.compare(o1.getStartTime(), o2.getStartTime());
}
};
+ final static Comparator<MinorFragmentProfile> lastUpdateCompare = new Comparator<MinorFragmentProfile>() {
+ public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
+ return Long.compare(o1.getLastUpdate(), o2.getLastUpdate());
+ }
+ };
+
+ final static Comparator<MinorFragmentProfile> lastProgressCompare = new Comparator<MinorFragmentProfile>() {
+ public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
+ return Long.compare(o1.getLastProgress(), o2.getLastProgress());
+ }
+ };
+
final static Comparator<MinorFragmentProfile> endTimeCompare = new Comparator<MinorFragmentProfile>() {
- public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+ public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
return Long.compare(o1.getEndTime(), o2.getEndTime());
}
};
final static Comparator<MinorFragmentProfile> fragPeakMemAllocated = new Comparator<MinorFragmentProfile>() {
- public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+ public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
return Long.compare(o1.getMaxMemoryUsed(), o2.getMaxMemoryUsed());
}
};
final static Comparator<MinorFragmentProfile> runTimeCompare = new Comparator<MinorFragmentProfile>() {
- public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+ public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
return Long.compare(o1.getEndTime() - o1.getStartTime(), o2.getEndTime() - o2.getStartTime());
}
};
final static Comparator<OperatorProfile> operatorIdCompare = new Comparator<OperatorProfile>() {
- public int compare(OperatorProfile o1, OperatorProfile o2) {
+ public int compare(final OperatorProfile o1, final OperatorProfile o2) {
return Long.compare(o1.getOperatorId(), o2.getOperatorId());
}
};
final static Comparator<Pair<OperatorProfile, Integer>> setupTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
- public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+ public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
return Long.compare(o1.getLeft().getSetupNanos(), o2.getLeft().getSetupNanos());
}
};
final static Comparator<Pair<OperatorProfile, Integer>> processTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
- public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+ public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
return Long.compare(o1.getLeft().getProcessNanos(), o2.getLeft().getProcessNanos());
}
};
final static Comparator<Pair<OperatorProfile, Integer>> waitTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
- public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+ public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
return Long.compare(o1.getLeft().getWaitNanos(), o2.getLeft().getWaitNanos());
}
};
final static Comparator<Pair<OperatorProfile, Integer>> opPeakMem = new Comparator<Pair<OperatorProfile, Integer>>() {
- public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+ public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
return Long.compare(o1.getLeft().getPeakLocalMemoryAllocated(), o2.getLeft().getPeakLocalMemoryAllocated());
}
};
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
index 3a66327..b245b30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
@@ -32,7 +32,7 @@ public class FragmentWrapper {
private final MajorFragmentProfile major;
private final long start;
- public FragmentWrapper(MajorFragmentProfile major, long start) {
+ public FragmentWrapper(final MajorFragmentProfile major, final long start) {
this.major = Preconditions.checkNotNull(major);
this.start = start;
}
@@ -45,44 +45,51 @@ public class FragmentWrapper {
return String.format("fragment-%s", major.getMajorFragmentId());
}
- public void addSummary(TableBuilder tb) {
+ public void addSummary(final TableBuilder tb, final int colCount) {
final String fmt = " (%d)";
- long t0 = start;
+ final long t0 = start;
- ArrayList<MinorFragmentProfile> complete = new ArrayList<MinorFragmentProfile>(
+ final ArrayList<MinorFragmentProfile> complete = new ArrayList<MinorFragmentProfile>(
Collections2.filter(major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
tb.appendCell(new OperatorPathBuilder().setMajor(major).build(), null);
tb.appendCell(complete.size() + " / " + major.getMinorFragmentProfileCount(), null);
if (complete.size() < 1) {
- tb.appendRepeated("", null, 7);
+ tb.appendRepeated("", null, colCount - 2);
return;
}
- int li = complete.size() - 1;
+ final MinorFragmentProfile firstStart = Collections.min(complete, Comparators.startTimeCompare);
+ final MinorFragmentProfile lastStart = Collections.max(complete, Comparators.startTimeCompare);
+ tb.appendMillis(firstStart.getStartTime() - t0, String.format(fmt, firstStart.getMinorFragmentId()));
+ tb.appendMillis(lastStart.getStartTime() - t0, String.format(fmt, lastStart.getMinorFragmentId()));
- Collections.sort(complete, Comparators.startTimeCompare);
- tb.appendMillis(complete.get(0).getStartTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
- tb.appendMillis(complete.get(li).getStartTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
-
- Collections.sort(complete, Comparators.endTimeCompare);
- tb.appendMillis(complete.get(0).getEndTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
- tb.appendMillis(complete.get(li).getEndTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
+ final MinorFragmentProfile firstEnd = Collections.min(complete, Comparators.endTimeCompare);
+ final MinorFragmentProfile lastEnd = Collections.max(complete, Comparators.endTimeCompare);
+ tb.appendMillis(firstEnd.getEndTime() - t0, String.format(fmt, firstEnd.getMinorFragmentId()));
+ tb.appendMillis(lastEnd.getEndTime() - t0, String.format(fmt, lastEnd.getMinorFragmentId()));
long total = 0;
- for (MinorFragmentProfile p : complete) {
+ for (final MinorFragmentProfile p : complete) {
total += p.getEndTime() - p.getStartTime();
}
- Collections.sort(complete, Comparators.runTimeCompare);
- tb.appendMillis(complete.get(0).getEndTime() - complete.get(0).getStartTime(),
- String.format(fmt, complete.get(0).getMinorFragmentId()));
+
+ final MinorFragmentProfile shortRun = Collections.min(complete, Comparators.endTimeCompare);
+ final MinorFragmentProfile longRun = Collections.max(complete, Comparators.endTimeCompare);
+
+ tb.appendMillis(shortRun.getEndTime() - shortRun.getStartTime(), String.format(fmt, shortRun.getMinorFragmentId()));
tb.appendMillis((long) (total / complete.size()), null);
- tb.appendMillis(complete.get(li).getEndTime() - complete.get(li).getStartTime(),
- String.format(fmt, complete.get(li).getMinorFragmentId()));
+ tb.appendMillis(longRun.getEndTime() - longRun.getStartTime(), String.format(fmt, longRun.getMinorFragmentId()));
- Collections.sort(complete, Comparators.fragPeakMemAllocated);
- tb.appendBytes(complete.get(li).getMaxMemoryUsed(), null);
+ final MinorFragmentProfile lastUpdate = Collections.max(complete, Comparators.lastUpdateCompare);
+ tb.appendTime(lastUpdate.getLastUpdate(), null);
+
+ final MinorFragmentProfile lastProgress = Collections.max(complete, Comparators.lastProgressCompare);
+ tb.appendTime(lastProgress.getLastProgress(), null);
+
+ final MinorFragmentProfile maxMem = Collections.max(complete, Comparators.fragPeakMemAllocated);
+ tb.appendBytes(maxMem.getMaxMemoryUsed(), null);
}
public String getContent() {
@@ -90,9 +97,10 @@ public class FragmentWrapper {
}
- public String majorFragmentTimingProfile(MajorFragmentProfile major) {
- final String[] columns = {"Minor Fragment", "Host", "Start", "End", "Total Time", "Max Records", "Max Batches", "Peak Memory", "State"};
- TableBuilder builder = new TableBuilder(columns);
+ public String majorFragmentTimingProfile(final MajorFragmentProfile major) {
+ final String[] columns = { "Minor Fragment", "Host", "Start", "End", "Total Time", "Max Records", "Max Batches",
+ "Last Update", "Last Progress", "Peak Memory", "State" };
+ final TableBuilder builder = new TableBuilder(columns);
ArrayList<MinorFragmentProfile> complete, incomplete;
complete = new ArrayList<MinorFragmentProfile>(Collections2.filter(
@@ -101,17 +109,17 @@ public class FragmentWrapper {
major.getMinorFragmentProfileList(), Filters.missingOperatorsOrTimes));
Collections.sort(complete, Comparators.minorIdCompare);
- for (MinorFragmentProfile minor : complete) {
- ArrayList<OperatorProfile> ops = new ArrayList<OperatorProfile>(minor.getOperatorProfileList());
+ for (final MinorFragmentProfile minor : complete) {
+ final ArrayList<OperatorProfile> ops = new ArrayList<OperatorProfile>(minor.getOperatorProfileList());
- long t0 = start;
+ final long t0 = start;
long biggestIncomingRecords = 0;
long biggestBatches = 0;
- for (OperatorProfile op : ops) {
+ for (final OperatorProfile op : ops) {
long incomingRecords = 0;
long batches = 0;
- for (StreamProfile sp : op.getInputProfileList()) {
+ for (final StreamProfile sp : op.getInputProfileList()) {
incomingRecords += sp.getRecords();
batches += sp.getBatches();
}
@@ -127,14 +135,17 @@ public class FragmentWrapper {
builder.appendFormattedInteger(biggestIncomingRecords, null);
builder.appendFormattedInteger(biggestBatches, null);
+
+ builder.appendTime(minor.getLastUpdate(), null);
+ builder.appendTime(minor.getLastProgress(), null);
builder.appendBytes(minor.getMaxMemoryUsed(), null);
builder.appendCell(minor.getState().name(), null);
}
- for (MinorFragmentProfile m : incomplete) {
+ for (final MinorFragmentProfile m : incomplete) {
builder.appendCell(
major.getMajorFragmentId() + "-"
+ m.getMinorFragmentId(), null);
- builder.appendRepeated(m.getState().toString(), null, 6);
+ builder.appendRepeated(m.getState().toString(), null, columns.length - 1);
}
return builder.toString();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
index 4f4fcdb..7a1d9b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
@@ -81,7 +81,6 @@ public class OperatorWrapper {
CoreOperatorType operatorType = CoreOperatorType.valueOf(ops.get(0).getLeft().getOperatorType());
tb.appendCell(operatorType == null ? "UNKNOWN_OPERATOR" : operatorType.toString(), null);
- int li = ops.size() - 1;
String fmt = " (%s)";
double setupSum = 0.0;
@@ -95,23 +94,26 @@ public class OperatorWrapper {
memSum += ip.getLeft().getPeakLocalMemoryAllocated();
}
- Collections.sort(ops, Comparators.setupTimeSort);
- tb.appendNanos(ops.get(0).getLeft().getSetupNanos(), String.format(fmt, ops.get(0).getRight()));
+ final ImmutablePair<OperatorProfile, Integer> shortSetup = Collections.min(ops, Comparators.setupTimeSort);
+ final ImmutablePair<OperatorProfile, Integer> longSetup = Collections.max(ops, Comparators.setupTimeSort);
+ tb.appendNanos(shortSetup.getLeft().getSetupNanos(), String.format(fmt, shortSetup.getRight()));
tb.appendNanos((long) (setupSum / ops.size()), null);
- tb.appendNanos(ops.get(li).getLeft().getSetupNanos(), String.format(fmt, ops.get(li).getRight()));
+ tb.appendNanos(longSetup.getLeft().getSetupNanos(), String.format(fmt, longSetup.getRight()));
- Collections.sort(ops, Comparators.processTimeSort);
- tb.appendNanos(ops.get(0).getLeft().getProcessNanos(), String.format(fmt, ops.get(0).getRight()));
+ final ImmutablePair<OperatorProfile, Integer> shortProcess = Collections.min(ops, Comparators.processTimeSort);
+ final ImmutablePair<OperatorProfile, Integer> longProcess = Collections.max(ops, Comparators.processTimeSort);
+ tb.appendNanos(shortProcess.getLeft().getProcessNanos(), String.format(fmt, shortProcess.getRight()));
tb.appendNanos((long) (processSum / ops.size()), null);
- tb.appendNanos(ops.get(li).getLeft().getProcessNanos(), String.format(fmt, ops.get(li).getRight()));
+ tb.appendNanos(longProcess.getLeft().getProcessNanos(), String.format(fmt, longProcess.getRight()));
- Collections.sort(ops, Comparators.waitTimeSort);
- tb.appendNanos(ops.get(0).getLeft().getWaitNanos(), String.format(fmt, ops.get(0).getRight()));
+ final ImmutablePair<OperatorProfile, Integer> shortWait = Collections.min(ops, Comparators.waitTimeSort);
+ final ImmutablePair<OperatorProfile, Integer> longWait = Collections.max(ops, Comparators.waitTimeSort);
+ tb.appendNanos(shortWait.getLeft().getWaitNanos(), String.format(fmt, shortWait.getRight()));
tb.appendNanos((long) (waitSum / ops.size()), null);
- tb.appendNanos(ops.get(li).getLeft().getWaitNanos(), String.format(fmt, ops.get(li).getRight()));
+ tb.appendNanos(longWait.getLeft().getWaitNanos(), String.format(fmt, longWait.getRight()));
- Collections.sort(ops, Comparators.opPeakMem);
+ final ImmutablePair<OperatorProfile, Integer> peakMem = Collections.max(ops, Comparators.opPeakMem);
tb.appendBytes((long) (memSum / ops.size()), null);
- tb.appendBytes(ops.get(li).getLeft().getPeakLocalMemoryAllocated(), null);
+ tb.appendBytes(peakMem.getLeft().getPeakLocalMemoryAllocated(), null);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
index 80016aa..479e655 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
@@ -38,7 +38,7 @@ public class ProfileWrapper {
public QueryProfile profile;
public String id;
- public ProfileWrapper(QueryProfile profile) {
+ public ProfileWrapper(final QueryProfile profile) {
this.profile = profile;
this.id = QueryIdHelper.getQueryId(profile.getId());
}
@@ -56,25 +56,25 @@ public class ProfileWrapper {
}
public List<OperatorWrapper> getOperatorProfiles() {
- List<OperatorWrapper> ows = Lists.newArrayList();
- Map<ImmutablePair<Integer, Integer>, List<ImmutablePair<OperatorProfile, Integer>>> opmap = Maps.newHashMap();
+ final List<OperatorWrapper> ows = Lists.newArrayList();
+ final Map<ImmutablePair<Integer, Integer>, List<ImmutablePair<OperatorProfile, Integer>>> opmap = Maps.newHashMap();
- List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
+ final List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
Collections.sort(majors, Comparators.majorIdCompare);
- for (MajorFragmentProfile major : majors) {
+ for (final MajorFragmentProfile major : majors) {
- List<MinorFragmentProfile> minors = new ArrayList<>(major.getMinorFragmentProfileList());
+ final List<MinorFragmentProfile> minors = new ArrayList<>(major.getMinorFragmentProfileList());
Collections.sort(minors, Comparators.minorIdCompare);
- for (MinorFragmentProfile minor : minors) {
+ for (final MinorFragmentProfile minor : minors) {
- List<OperatorProfile> ops = new ArrayList<>(minor.getOperatorProfileList());
+ final List<OperatorProfile> ops = new ArrayList<>(minor.getOperatorProfileList());
Collections.sort(ops, Comparators.operatorIdCompare);
- for (OperatorProfile op : ops) {
+ for (final OperatorProfile op : ops) {
- ImmutablePair<Integer, Integer> ip = new ImmutablePair<>(
+ final ImmutablePair<Integer, Integer> ip = new ImmutablePair<>(
major.getMajorFragmentId(), op.getOperatorId());
if (!opmap.containsKey(ip)) {
- List<ImmutablePair<OperatorProfile, Integer>> l = Lists.newArrayList();
+ final List<ImmutablePair<OperatorProfile, Integer>> l = Lists.newArrayList();
opmap.put(ip, l);
}
opmap.get(ip).add(new ImmutablePair<>(op, minor.getMinorFragmentId()));
@@ -82,10 +82,10 @@ public class ProfileWrapper {
}
}
- List<ImmutablePair<Integer, Integer>> keys = new ArrayList<>(opmap.keySet());
+ final List<ImmutablePair<Integer, Integer>> keys = new ArrayList<>(opmap.keySet());
Collections.sort(keys);
- for (ImmutablePair<Integer, Integer> ip : keys) {
+ for (final ImmutablePair<Integer, Integer> ip : keys) {
ows.add(new OperatorWrapper(ip.getLeft(), opmap.get(ip)));
}
@@ -93,11 +93,11 @@ public class ProfileWrapper {
}
public List<FragmentWrapper> getFragmentProfiles() {
- List<FragmentWrapper> fws = Lists.newArrayList();
+ final List<FragmentWrapper> fws = Lists.newArrayList();
- List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
+ final List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
Collections.sort(majors, Comparators.majorIdCompare);
- for (MajorFragmentProfile major : majors) {
+ for (final MajorFragmentProfile major : majors) {
fws.add(new FragmentWrapper(major, profile.getStart()));
}
@@ -105,10 +105,11 @@ public class ProfileWrapper {
}
public String getFragmentsOverview() {
- final String[] columns = {"Major Fragment", "Minor Fragments Reporting", "First Start", "Last Start", "First End", "Last End", "tmin", "tavg", "tmax", "memmax"};
- TableBuilder tb = new TableBuilder(columns);
- for (FragmentWrapper fw : getFragmentProfiles()) {
- fw.addSummary(tb);
+ final String[] columns = { "Major Fragment", "Minor Fragments Reporting", "First Start", "Last Start", "First End",
+ "Last End", "tmin", "tavg", "tmax", "last update", "last progress", "memmax" };
+ final TableBuilder tb = new TableBuilder(columns);
+ for (final FragmentWrapper fw : getFragmentProfiles()) {
+ fw.addSummary(tb, columns.length);
}
return tb.toString();
}
@@ -117,17 +118,17 @@ public class ProfileWrapper {
public String getOperatorsOverview() {
final String [] columns = {"Operator", "Type", "Setup (min)", "Setup (avg)", "Setup (max)", "Process (min)", "Process (avg)", "Process (max)", "Wait (min)", "Wait (avg)", "Wait (max)", "Mem (avg)", "Mem (max)"};
- TableBuilder tb = new TableBuilder(columns);
- for (OperatorWrapper ow : getOperatorProfiles()) {
+ final TableBuilder tb = new TableBuilder(columns);
+ for (final OperatorWrapper ow : getOperatorProfiles()) {
ow.addSummary(tb);
}
return tb.toString();
}
public String getOperatorsJSON() {
- StringBuilder sb = new StringBuilder("{");
+ final StringBuilder sb = new StringBuilder("{");
String sep = "";
- for (CoreOperatorType op : CoreOperatorType.values()) {
+ for (final CoreOperatorType op : CoreOperatorType.values()) {
sb.append(String.format("%s\"%d\" : \"%s\"", sep, op.ordinal(), op));
sep = ", ";
}
[4/7] drill git commit: DRILL-2762: Update Fragment state reporting
and error collection
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
index 72f4436..e91404f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
@@ -25,23 +25,25 @@ import java.util.Date;
import java.util.Locale;
class TableBuilder {
- NumberFormat format = NumberFormat.getInstance(Locale.US);
- SimpleDateFormat hours = new SimpleDateFormat("HH:mm");
- SimpleDateFormat shours = new SimpleDateFormat("H:mm");
- SimpleDateFormat mins = new SimpleDateFormat("mm:ss");
- SimpleDateFormat smins = new SimpleDateFormat("m:ss");
-
- SimpleDateFormat secs = new SimpleDateFormat("ss.SSS");
- SimpleDateFormat ssecs = new SimpleDateFormat("s.SSS");
- DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
- DecimalFormat dec = new DecimalFormat("0.00");
- DecimalFormat intformat = new DecimalFormat("#,###");
-
- StringBuilder sb;
- int w = 0;
- int width;
-
- public TableBuilder(String[] columns) {
+ private final NumberFormat format = NumberFormat.getInstance(Locale.US);
+ private final SimpleDateFormat days = new SimpleDateFormat("DD'd'hh'h'mm'm'");
+ private final SimpleDateFormat sdays = new SimpleDateFormat("DD'd'hh'h'mm'm'");
+ private final SimpleDateFormat hours = new SimpleDateFormat("HH'h'mm'm'");
+ private final SimpleDateFormat shours = new SimpleDateFormat("H'h'mm'm'");
+ private final SimpleDateFormat mins = new SimpleDateFormat("mm'm'ss's'");
+ private final SimpleDateFormat smins = new SimpleDateFormat("m'm'ss's'");
+
+ private final SimpleDateFormat secs = new SimpleDateFormat("ss.SSS's'");
+ private final SimpleDateFormat ssecs = new SimpleDateFormat("s.SSS's'");
+ private final DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
+ private final DecimalFormat dec = new DecimalFormat("0.00");
+ private final DecimalFormat intformat = new DecimalFormat("#,###");
+
+ private StringBuilder sb;
+ private int w = 0;
+ private int width;
+
+ public TableBuilder(final String[] columns) {
sb = new StringBuilder();
width = columns.length;
@@ -49,13 +51,13 @@ class TableBuilder {
format.setMinimumFractionDigits(3);
sb.append("<table class=\"table table-bordered text-right\">\n<tr>");
- for (String cn : columns) {
+ for (final String cn : columns) {
sb.append("<th>" + cn + "</th>");
}
sb.append("</tr>\n");
}
- public void appendCell(String s, String link) {
+ public void appendCell(final String s, final String link) {
if (w == 0) {
sb.append("<tr>");
}
@@ -66,22 +68,27 @@ class TableBuilder {
}
}
- public void appendRepeated(String s, String link, int n) {
+ public void appendRepeated(final String s, final String link, final int n) {
for (int i = 0; i < n; i++) {
appendCell(s, link);
}
}
- public void appendTime(long d, String link) {
+ public void appendTime(final long d, final String link) {
appendCell(dateFormat.format(d), link);
}
- public void appendMillis(long p, String link) {
- double secs = p/1000.0;
- double mins = secs/60;
- double hours = mins/60;
+ public void appendMillis(final long p, final String link) {
+ final double secs = p/1000.0;
+ final double mins = secs/60;
+ final double hours = mins/60;
+ final double days = hours / 24;
SimpleDateFormat timeFormat = null;
- if(hours >= 10){
+ if (days >= 10) {
+ timeFormat = this.days;
+ } else if (days >= 1) {
+ timeFormat = this.sdays;
+ } else if (hours >= 10) {
timeFormat = this.hours;
}else if(hours >= 1){
timeFormat = this.shours;
@@ -97,30 +104,30 @@ class TableBuilder {
appendCell(timeFormat.format(new Date(p)), null);
}
- public void appendNanos(long p, String link) {
+ public void appendNanos(final long p, final String link) {
appendMillis((long) (p / 1000.0 / 1000.0), link);
}
- public void appendFormattedNumber(Number n, String link) {
+ public void appendFormattedNumber(final Number n, final String link) {
appendCell(format.format(n), link);
}
- public void appendFormattedInteger(long n, String link) {
+ public void appendFormattedInteger(final long n, final String link) {
appendCell(intformat.format(n), link);
}
- public void appendInteger(long l, String link) {
+ public void appendInteger(final long l, final String link) {
appendCell(Long.toString(l), link);
}
- public void appendBytes(long l, String link){
+ public void appendBytes(final long l, final String link){
appendCell(bytePrint(l), link);
}
- private String bytePrint(long size){
- double m = size/Math.pow(1024, 2);
- double g = size/Math.pow(1024, 3);
- double t = size/Math.pow(1024, 4);
+ private String bytePrint(final long size){
+ final double m = size/Math.pow(1024, 2);
+ final double g = size/Math.pow(1024, 3);
+ final double t = size/Math.pow(1024, 4);
if (t > 1) {
return dec.format(t).concat("TB");
} else if (g > 1) {
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index e2bcec3..a3ceb8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -23,7 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.SelfCleaningRunnable;
@@ -47,6 +48,7 @@ import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.batch.ControlHandlerImpl;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.QueryManager;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.user.UserWorker;
@@ -101,7 +103,17 @@ public class WorkManager implements AutoCloseable {
* threads that can be created. Ideally, this might be computed based on the number of cores or
* some similar metric; ThreadPoolExecutor can impose an upper bound, and might be a better choice.
*/
- executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
+ executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new NamedThreadFactory("WorkManager-")) {
+ @Override
+ protected void afterExecute(final Runnable r, final Throwable t) {
+ if(t != null){
+ logger.error("{}.run() leaked an exception.", r.getClass().getName(), t);
+ }
+ super.afterExecute(r, t);
+ }
+ };
+
// TODO references to this escape here (via WorkerBee) before construction is done
controlMessageWorker = new ControlHandlerImpl(bee); // TODO getFragmentRunner(), getForemanForQueryId()
@@ -125,7 +137,7 @@ public class WorkManager implements AutoCloseable {
return runningFragments.size();
}
});
- } catch (IllegalArgumentException e) {
+ } catch (final IllegalArgumentException e) {
logger.warn("Exception while registering metrics", e);
}
}
@@ -160,7 +172,7 @@ public class WorkManager implements AutoCloseable {
if (executor != null) {
executor.awaitTermination(1, TimeUnit.SECONDS);
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.warn("Executor interrupted while awaiting termination");
}
}
@@ -188,7 +200,7 @@ public class WorkManager implements AutoCloseable {
while(true) {
try {
exitLatch.await(5, TimeUnit.SECONDS);
- } catch(InterruptedException e) {
+ } catch(final InterruptedException e) {
// keep waiting
}
break;
@@ -263,6 +275,7 @@ public class WorkManager implements AutoCloseable {
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
+ workBus.removeFragmentManager(fragmentHandle);
indicateIfSafeToExit();
}
});
@@ -289,28 +302,29 @@ public class WorkManager implements AutoCloseable {
@Override
public void run() {
while(true) {
+ final Controller controller = dContext.getController();
final List<DrillRpcFuture<Ack>> futures = Lists.newArrayList();
- for(FragmentExecutor fragmentExecutor : runningFragments.values()) {
+ for(final FragmentExecutor fragmentExecutor : runningFragments.values()) {
final FragmentStatus status = fragmentExecutor.getStatus();
if (status == null) {
continue;
}
final DrillbitEndpoint ep = fragmentExecutor.getContext().getForemanEndpoint();
- futures.add(dContext.getController().getTunnel(ep).sendFragmentStatus(status));
+ futures.add(controller.getTunnel(ep).sendFragmentStatus(status));
}
- for(DrillRpcFuture<Ack> future : futures) {
+ for(final DrillRpcFuture<Ack> future : futures) {
try {
future.checkedGet();
- } catch(RpcException ex) {
+ } catch(final RpcException ex) {
logger.info("Failure while sending intermediate fragment status to Foreman", ex);
}
}
try {
Thread.sleep(STATUS_PERIOD_SECONDS * 1000);
- } catch(InterruptedException e) {
+ } catch(final InterruptedException e) {
// exit status thread on interrupt.
break;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 3a7123d..3fb0775 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcConstants;
@@ -137,10 +138,10 @@ public class ControlHandlerImpl implements ControlMessageHandler {
drillbitContext.getWorkBus().addFragmentManager(manager);
}
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new UserRpcException(drillbitContext.getEndpoint(),
"Failure while trying to start remote fragment", e);
- } catch (OutOfMemoryError t) {
+ } catch (final OutOfMemoryError t) {
if (t.getMessage().startsWith("Direct buffer")) {
throw new UserRpcException(drillbitContext.getEndpoint(),
"Out of direct memory while trying to start remote fragment", t);
@@ -171,19 +172,24 @@ public class ControlHandlerImpl implements ControlMessageHandler {
}
private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
+
final FragmentManager manager =
bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
FragmentExecutor executor;
if (manager != null) {
- executor = manager.getRunnable();
+ manager.receivingFragmentFinished(finishedReceiver.getReceiver());
} else {
// then try local cancel.
executor = bee.getFragmentRunner(finishedReceiver.getSender());
- }
-
- if (executor != null) {
- executor.receivingFragmentFinished(finishedReceiver.getReceiver());
+ if (executor != null) {
+ executor.receivingFragmentFinished(finishedReceiver.getReceiver());
+ } else {
+ logger.warn(
+ "Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.",
+ QueryIdHelper.getFragmentId(finishedReceiver.getSender()),
+ QueryIdHelper.getFragmentId(finishedReceiver.getReceiver()));
+ }
}
return Acks.OK;
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 2430e64..85262de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -47,10 +47,10 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
private final AtomicBoolean outOfMemory = new AtomicBoolean(false);
private final ResponseSenderQueue readController = new ResponseSenderQueue();
private int streamCounter;
- private int fragmentCount;
- private FragmentContext context;
+ private final int fragmentCount;
+ private final FragmentContext context;
- public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount) {
+ public UnlimitedRawBatchBuffer(final FragmentContext context, final int fragmentCount) {
bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
this.softlimit = bufferSizePerSocket * fragmentCount;
@@ -63,7 +63,14 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
}
@Override
- public void enqueue(RawFragmentBatch batch) throws IOException {
+ public void enqueue(final RawFragmentBatch batch) throws IOException {
+
+ // if this fragment is already canceled or failed, we shouldn't need any or more stuff. We do the null check to
+ // ensure that tests run.
+ if (context != null && !context.shouldContinue()) {
+ this.kill(context);
+ }
+
if (isFinished()) {
if (state == BufferState.KILLED) {
// do not even enqueue just release and send ack back
@@ -76,8 +83,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
}
if (batch.getHeader().getIsOutOfMemory()) {
logger.trace("Setting autoread false");
- RawFragmentBatch firstBatch = buffer.peekFirst();
- FragmentRecordBatch header = firstBatch == null ? null :firstBatch.getHeader();
+ final RawFragmentBatch firstBatch = buffer.peekFirst();
+ final FragmentRecordBatch header = firstBatch == null ? null :firstBatch.getHeader();
if (!outOfMemory.get() && !(header == null) && header.getIsOutOfMemory()) {
buffer.addFirst(batch);
}
@@ -96,16 +103,16 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
@Override
public void cleanup() {
- if (!isFinished() && !context.isCancelled()) {
- String msg = String.format("Cleanup before finished. " + (fragmentCount - streamCounter) + " out of " + fragmentCount + " streams have finished.");
+ if (!isFinished() && context.shouldContinue()) {
+ final String msg = String.format("Cleanup before finished. " + (fragmentCount - streamCounter) + " out of " + fragmentCount + " streams have finished.");
logger.error(msg);
- IllegalStateException e = new IllegalStateException(msg);
+ final IllegalStateException e = new IllegalStateException(msg);
context.fail(e);
throw e;
}
if (!buffer.isEmpty()) {
- if (!context.isFailed() && !context.isCancelled()) {
+ if (context.shouldContinue()) {
context.fail(new IllegalStateException("Batches still in queue during cleanup"));
logger.error("{} Batches in queue.", buffer.size());
}
@@ -114,7 +121,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
}
@Override
- public void kill(FragmentContext context) {
+ public void kill(final FragmentContext context) {
state = BufferState.KILLED;
clearBufferWithBody();
}
@@ -125,7 +132,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
*/
private void clearBufferWithBody() {
while (!buffer.isEmpty()) {
- RawFragmentBatch batch = buffer.poll();
+ final RawFragmentBatch batch = buffer.poll();
if (batch.getBody() != null) {
batch.getBody().release();
}
@@ -160,7 +167,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
if (b == null && (!isFinished() || !buffer.isEmpty())) {
try {
b = buffer.take();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
return null;
// TODO InterruptedException
}
@@ -175,9 +182,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
// try to flush the difference between softlimit and queue size, so every flush we are reducing backlog
// when queue size is lower then softlimit - the bigger the difference the more we can flush
if (!isFinished() && overlimit.get()) {
- int flushCount = softlimit - buffer.size();
+ final int flushCount = softlimit - buffer.size();
if ( flushCount > 0 ) {
- int flushed = readController.flushResponses(flushCount);
+ final int flushed = readController.flushResponses(flushCount);
logger.trace("flush {} entries, flushed {} entries ", flushCount, flushed);
if ( flushed == 0 ) {
// queue is empty - nothing to do for now
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index f824b53..d94b9f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -23,15 +23,15 @@ import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Preconditions;
-
import org.apache.drill.common.EventProcessor;
+import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.exec.ExecConstants;
@@ -77,8 +77,10 @@ import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.RootFragmentManager;
import org.codehaus.jackson.map.ObjectMapper;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
/**
* Foreman manages all the fragments (local and remote) for a single query where this
@@ -99,6 +101,7 @@ import com.google.common.collect.Multimap;
public class Foreman implements Runnable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
private final static ExceptionInjector injector = ExceptionInjector.getInjector(Foreman.class);
+ private static final int RPC_WAIT_IN_SECONDS = 90;
private final QueryId queryId;
private final RunQuery queryRequest;
@@ -113,7 +116,7 @@ public class Foreman implements Runnable {
private FragmentExecutor rootRunner; // root Fragment
- private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); // gates acceptance of external events
+ private final ExtendedLatch acceptExternalEvents = new ExtendedLatch(); // gates acceptance of external events
private final StateListener stateListener = new StateListener(); // source of external events
private final ResponseSendListener responseListener = new ResponseSendListener();
private final StateSwitch stateSwitch = new StateSwitch();
@@ -204,12 +207,12 @@ public class Foreman implements Runnable {
throw new IllegalStateException();
}
injector.injectChecked(drillbitContext, "run-try-end", ForemanException.class);
- } catch (ForemanException e) {
+ } catch (final ForemanException e) {
moveToState(QueryState.FAILED, e);
} catch (AssertionError | Exception ex) {
moveToState(QueryState.FAILED,
new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
- } catch (OutOfMemoryError e) {
+ } catch (final OutOfMemoryError e) {
/*
* FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman.
* So, if we die here, they should get notified about that, and cancel themselves; we don't have to
@@ -255,9 +258,9 @@ public class Foreman implements Runnable {
try {
lease.close();
lease = null;
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
// if we end up here, the while loop will try again
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.warn("Failure while releasing lease.", e);
break;
}
@@ -268,7 +271,7 @@ public class Foreman implements Runnable {
LogicalPlan logicalPlan;
try {
logicalPlan = drillbitContext.getPlanReader().readLogicalPlan(json);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ForemanException("Failure parsing logical plan.", e);
}
@@ -299,9 +302,9 @@ public class Foreman implements Runnable {
private void log(final PhysicalPlan plan) {
if (logger.isDebugEnabled()) {
try {
- String planText = queryContext.getConfig().getMapper().writeValueAsString(plan);
+ final String planText = queryContext.getConfig().getMapper().writeValueAsString(plan);
logger.debug("Physical {}", planText);
- } catch (IOException e) {
+ } catch (final IOException e) {
logger.warn("Error while attempting to log physical plan.", e);
}
}
@@ -324,7 +327,7 @@ public class Foreman implements Runnable {
try {
final PhysicalPlan plan = drillbitContext.getPlanReader().readPhysicalPlan(json);
runPhysicalPlan(plan);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ForemanSetupException("Failure while parsing physical plan.", e);
}
}
@@ -339,8 +342,8 @@ public class Foreman implements Runnable {
final PlanFragment rootPlanFragment = work.getRootFragment();
assert queryId == rootPlanFragment.getHandle().getQueryId();
- drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager);
- drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager);
+ drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
+ drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
logger.debug("Submitting fragments to run.");
@@ -365,7 +368,7 @@ public class Foreman implements Runnable {
private void setupSortMemoryAllocations(final PhysicalPlan plan) {
// look for external sorts
final List<ExternalSort> sortList = new LinkedList<>();
- for (PhysicalOperator op : plan.getSortedOperators()) {
+ for (final PhysicalOperator op : plan.getSortedOperators()) {
if (op instanceof ExternalSort) {
sortList.add((ExternalSort) op);
}
@@ -382,7 +385,7 @@ public class Foreman implements Runnable {
final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);
logger.debug("Max sort alloc: {}", maxSortAlloc);
- for(ExternalSort externalSort : sortList) {
+ for(final ExternalSort externalSort : sortList) {
externalSort.setMaxAllocation(maxSortAlloc);
}
}
@@ -403,7 +406,7 @@ public class Foreman implements Runnable {
if (queuingEnabled) {
final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val;
double totalCost = 0;
- for (PhysicalOperator ops : plan.getSortedOperators()) {
+ for (final PhysicalOperator ops : plan.getSortedOperators()) {
totalCost += ops.getCost();
}
@@ -423,7 +426,7 @@ public class Foreman implements Runnable {
final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val;
lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS);
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ForemanSetupException("Unable to acquire slot for query.", e);
}
}
@@ -447,7 +450,7 @@ public class Foreman implements Runnable {
final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
final int fragmentCount = planFragments.size();
int fragmentIndex = 0;
- for(PlanFragment planFragment : planFragments) {
+ for(final PlanFragment planFragment : planFragments) {
final FragmentHandle fragmentHandle = planFragment.getHandle();
sb.append("PlanFragment(");
sb.append(++fragmentIndex);
@@ -471,7 +474,7 @@ public class Foreman implements Runnable {
{
final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
- } catch(Exception e) {
+ } catch(final Exception e) {
// we've already set jsonString to a fallback value
}
sb.append(jsonString);
@@ -567,7 +570,7 @@ public class Foreman implements Runnable {
try {
autoCloseable.close();
- } catch(Exception e) {
+ } catch(final Exception e) {
/*
* Even if the query completed successfully, we'll still report failure if we have
* problems cleaning up.
@@ -582,11 +585,11 @@ public class Foreman implements Runnable {
Preconditions.checkState(!isClosed);
Preconditions.checkState(resultState != null);
- logger.info("foreman cleaning up - status: {}", queryManager.getFragmentStatesAsString());
+ logger.info("foreman cleaning up.");
// These are straight forward removals from maps, so they won't throw.
drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
- drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
+ drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager.getDrillbitStatusListener());
suppressingClose(queryContext);
@@ -612,8 +615,8 @@ public class Foreman implements Runnable {
.setQueryId(queryId)
.setQueryState(resultState);
if (resultException != null) {
- boolean verbose = queryContext.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
- UserException uex = UserException.systemError(resultException).addIdentity(queryContext.getCurrentEndpoint()).build();
+ final boolean verbose = queryContext.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
+ final UserException uex = UserException.systemError(resultException).addIdentity(queryContext.getCurrentEndpoint()).build();
resultBuilder.addError(uex.getOrCreatePBError(verbose));
}
@@ -626,7 +629,7 @@ public class Foreman implements Runnable {
try {
// send whatever result we ended up with
initiatingClient.sendResult(responseListener, resultBuilder.build(), true);
- } catch(Exception e) {
+ } catch(final Exception e) {
addException(e);
logger.warn("Exception sending result to client", resultException);
}
@@ -658,7 +661,7 @@ public class Foreman implements Runnable {
}
@Override
- protected void processEvent(StateEvent event) {
+ protected void processEvent(final StateEvent event) {
final QueryState newState = event.newState;
final Exception exception = event.exception;
@@ -803,7 +806,7 @@ public class Foreman implements Runnable {
queryManager.addFragmentStatusTracker(rootFragment, true);
rootRunner = new FragmentExecutor(rootContext, rootOperator,
- queryManager.getRootStatusHandler(rootContext));
+ queryManager.newRootStatusHandler(rootContext));
final RootFragmentManager fragmentManager =
new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
@@ -836,7 +839,7 @@ public class Foreman implements Runnable {
final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
// record all fragments for status purposes.
- for (PlanFragment planFragment : fragments) {
+ for (final PlanFragment planFragment : fragments) {
logger.trace("Tracking intermediate remote node {} with data {}",
planFragment.getAssignment(), planFragment.getFragmentJson());
queryManager.addFragmentStatusTracker(planFragment, false);
@@ -855,40 +858,53 @@ public class Foreman implements Runnable {
* count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
* know if any submissions did fail.
*/
- final CountDownLatch endpointLatch = new CountDownLatch(intFragmentMap.keySet().size());
+ final ExtendedLatch endpointLatch = new ExtendedLatch(intFragmentMap.keySet().size());
final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
// send remote intermediate fragments
- for (DrillbitEndpoint ep : intFragmentMap.keySet()) {
+ for (final DrillbitEndpoint ep : intFragmentMap.keySet()) {
sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
}
- // wait for the status of all requests sent above to be known
- boolean ready = false;
- while(!ready) {
- try {
- endpointLatch.await();
- ready = true;
- } catch (InterruptedException e) {
- // if we weren't ready, the while loop will continue to wait
- }
+ if(!endpointLatch.awaitUninterruptibly(RPC_WAIT_IN_SECONDS * 1000)){
+ long numberRemaining = endpointLatch.getCount();
+ throw UserException.connectionError()
+ .message(
+ "Exceeded timeout while waiting send intermediate work fragments to remote nodes. Sent %d and only heard response back from %d nodes.",
+ intFragmentMap.keySet().size(), intFragmentMap.keySet().size() - numberRemaining)
+ .build();
}
+
// if any of the intermediate fragment submissions failed, fail the query
- final List<FragmentSubmitFailures.SubmissionException> submissionExceptions =
- fragmentSubmitFailures.submissionExceptions;
+ final List<FragmentSubmitFailures.SubmissionException> submissionExceptions = fragmentSubmitFailures.submissionExceptions;
if (submissionExceptions.size() > 0) {
- throw new ForemanSetupException("Error setting up remote intermediate fragment execution",
- submissionExceptions.get(0).rpcException);
- // TODO indicate the failing drillbit?
- // TODO report on all the failures?
+ Set<DrillbitEndpoint> endpoints = Sets.newHashSet();
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+
+ for (FragmentSubmitFailures.SubmissionException e : fragmentSubmitFailures.submissionExceptions) {
+ DrillbitEndpoint endpoint = e.drillbitEndpoint;
+ if (endpoints.add(endpoint)) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(endpoint.getAddress());
+ }
+ }
+ throw UserException.connectionError(submissionExceptions.get(0).rpcException)
+ .message("Error setting up remote intermediate fragment execution")
+ .addContext("Nodes with failures", sb.toString())
+ .build();
}
/*
* Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
* the regular sendListener event delivery.
*/
- for (DrillbitEndpoint ep : leafFragmentMap.keySet()) {
+ for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) {
sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
}
}
@@ -906,7 +922,7 @@ public class Foreman implements Runnable {
@SuppressWarnings("resource")
final Controller controller = drillbitContext.getController();
final InitializeFragments.Builder fb = InitializeFragments.newBuilder();
- for(PlanFragment planFragment : fragments) {
+ for(final PlanFragment planFragment : fragments) {
fb.addFragment(planFragment);
}
final InitializeFragments initFrags = fb.build();
@@ -926,12 +942,12 @@ public class Foreman implements Runnable {
*/
private static class FragmentSubmitFailures {
static class SubmissionException {
-// final DrillbitEndpoint drillbitEndpoint;
+ final DrillbitEndpoint drillbitEndpoint;
final RpcException rpcException;
SubmissionException(@SuppressWarnings("unused") final DrillbitEndpoint drillbitEndpoint,
final RpcException rpcException) {
-// this.drillbitEndpoint = drillbitEndpoint;
+ this.drillbitEndpoint = drillbitEndpoint;
this.rpcException = rpcException;
}
}
@@ -999,16 +1015,7 @@ public class Foreman implements Runnable {
* to the user
*/
public void moveToState(final QueryState newState, final Exception ex) {
- boolean ready = false;
- while(!ready) {
- try {
- acceptExternalEvents.await();
- ready = true;
- } catch(InterruptedException e) {
- // if we're still not ready, the while loop will cause us to wait again
- logger.warn("Interrupted while waiting to move state.", e);
- }
- }
+ acceptExternalEvents.awaitUninterruptibly();
Foreman.this.moveToState(newState, ex);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
index 433ab26..ceb77f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
@@ -22,11 +22,16 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.FragmentState;
import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
public class FragmentData {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentData.class);
+
private final boolean isLocal;
private volatile FragmentStatus status;
- private volatile long lastStatusUpdate = 0;
+ private volatile long lastStatusUpdate = System.currentTimeMillis();
+ private volatile long lastProgress = System.currentTimeMillis();
private final DrillbitEndpoint endpoint;
public FragmentData(final FragmentHandle handle, final DrillbitEndpoint endpoint, final boolean isLocal) {
@@ -43,13 +48,45 @@ public class FragmentData {
.build();
}
- public void setStatus(final FragmentStatus status) {
- this.status = status;
- lastStatusUpdate = System.currentTimeMillis();
+ /**
+ * Update the status for this fragment. Also records last update and last progress time.
+ * @param status Updated status
+ * @return Whether or not the status update resulted in a FragmentState change.
+ */
+ public boolean setStatus(final FragmentStatus newStatus) {
+ final long time = System.currentTimeMillis();
+ final FragmentState oldState = status.getProfile().getState();
+ final boolean inTerminalState = oldState == FragmentState.FAILED || oldState == FragmentState.FINISHED || oldState == FragmentState.CANCELLED;
+ final FragmentState currentState = newStatus.getProfile().getState();
+ final boolean stateChanged = currentState != oldState;
+
+ if (inTerminalState) {
+ // already in a terminal state. This shouldn't happen.
+ logger.warn(String.format("Received status message for fragment %s after fragment was in state %s. New state was %s",
+ QueryIdHelper.getQueryIdentifier(getHandle()), oldState, currentState));
+ return false;
+ }
+
+ this.lastStatusUpdate = time;
+ if (madeProgress(status, newStatus)) {
+ this.lastProgress = time;
+ }
+ status = newStatus;
+
+ return stateChanged;
+ }
+
+ public FragmentState getState() {
+ return status.getProfile().getState();
}
- public FragmentStatus getStatus() {
- return status;
+ public MinorFragmentProfile getProfile() {
+ return status
+ .getProfile()
+ .toBuilder()
+ .setLastUpdate(lastStatusUpdate)
+ .setLastProgress(lastProgress)
+ .build();
}
public boolean isLocal() {
@@ -64,6 +101,34 @@ public class FragmentData {
return status.getHandle();
}
+ private boolean madeProgress(final FragmentStatus prev, final FragmentStatus cur) {
+ final MinorFragmentProfile previous = prev.getProfile();
+ final MinorFragmentProfile current = cur.getProfile();
+
+ if (previous.getState() != current.getState()) {
+ return true;
+ }
+
+ if (previous.getOperatorProfileCount() != current.getOperatorProfileCount()) {
+ return true;
+ }
+
+ for(int i =0; i < current.getOperatorProfileCount(); i++){
+ if (madeProgress(previous.getOperatorProfile(i), current.getOperatorProfile(i))) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private boolean madeProgress(final OperatorProfile prev, final OperatorProfile cur) {
+ return prev.getInputProfileCount() != cur.getInputProfileCount()
+ || !prev.getInputProfileList().equals(cur.getInputProfileList())
+ || prev.getMetricCount() != cur.getMetricCount()
+ || !prev.getMetricList().equals(cur.getMetricList());
+ }
+
@Override
public String toString() {
return "FragmentData [isLocal=" + isLocal + ", status=" + status + ", lastStatusUpdate=" + lastStatusUpdate
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 31b1f2b..34fa639 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -21,11 +21,12 @@ import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -50,15 +51,17 @@ import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.foreman.Foreman.StateListener;
import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.StatusReporter;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
/**
* Each Foreman holds its own QueryManager. This manages the events associated with execution of a particular query across all fragments.
*/
-public class QueryManager implements FragmentStatusListener, DrillbitStatusListener {
+public class QueryManager {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig.
@@ -74,7 +77,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
.ephemeral()
.build();
- private final Set<DrillbitEndpoint> includedBits;
+ private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
private final StateListener stateListener;
private final QueryId queryId;
private final String stringQueryId;
@@ -94,8 +97,13 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
// the following mutable variables are used to capture ongoing query status
private String planText;
- private long startTime;
+ private long startTime = System.currentTimeMillis();
private long endTime;
+
+ // How many nodes have finished their execution. Query is complete when all nodes are complete.
+ private final AtomicInteger finishedNodes = new AtomicInteger(0);
+
+ // How many fragments have finished their execution.
private final AtomicInteger finishedFragments = new AtomicInteger(0);
public QueryManager(final QueryId queryId, final RunQuery runQuery, final PStoreProvider pStoreProvider,
@@ -109,83 +117,27 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
try {
profilePStore = pStoreProvider.getStore(QUERY_PROFILE);
profileEStore = pStoreProvider.getStore(RUNNING_QUERY_INFO);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new DrillRuntimeException(e);
}
-
- includedBits = Sets.newHashSet();
- }
-
- @Override
- public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
- }
-
- @Override
- public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbits) {
- for(DrillbitEndpoint ep : unregisteredDrillbits) {
- if (includedBits.contains(ep)) {
- logger.warn("Drillbit {} no longer registered in cluster. Canceling query {}",
- ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId));
- stateListener.moveToState(QueryState.FAILED,
- new ForemanException("One more more nodes lost connectivity during query. Identified node was "
- + ep.getAddress()));
- }
- }
}
- @Override
- public void statusUpdate(final FragmentStatus status) {
- logger.debug("New fragment status was provided to QueryManager of {}", status);
- switch(status.getProfile().getState()) {
- case AWAITING_ALLOCATION:
- case RUNNING:
- updateFragmentStatus(status);
- break;
-
- case FINISHED:
- fragmentDone(status);
- break;
-
- case CANCELLED:
- /*
- * TODO
- * This doesn't seem right; shouldn't this be similar to FAILED?
- * and this means once all are cancelled we'll get to COMPLETED, even though some weren't?
- *
- * So, we add it to the finishedFragments if we ourselves we receive a statusUpdate (from where),
- * but not if our cancellation listener gets it?
- */
- // TODO(DRILL-2370) we might not get these, so we need to take extra care for cleanup
- fragmentDone(status);
- break;
-
- case FAILED:
- stateListener.moveToState(QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
- break;
-
- default:
- throw new UnsupportedOperationException(String.format("Received status of %s", status));
- }
- }
-
- private void updateFragmentStatus(final FragmentStatus fragmentStatus) {
+ private boolean updateFragmentStatus(final FragmentStatus fragmentStatus) {
final FragmentHandle fragmentHandle = fragmentStatus.getHandle();
final int majorFragmentId = fragmentHandle.getMajorFragmentId();
final int minorFragmentId = fragmentHandle.getMinorFragmentId();
- fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus);
+ final FragmentData data = fragmentDataMap.get(majorFragmentId).get(minorFragmentId);
+ return data.setStatus(fragmentStatus);
}
private void fragmentDone(final FragmentStatus status) {
- updateFragmentStatus(status);
+ final boolean stateChanged = updateFragmentStatus(status);
- final int finishedFragments = this.finishedFragments.incrementAndGet();
- final int totalFragments = fragmentDataSet.size();
- assert finishedFragments <= totalFragments : "The finished fragment count exceeds the total fragment count";
- final int remaining = totalFragments - finishedFragments;
- logger.debug("waiting for {} fragments", remaining);
- if (remaining == 0) {
- // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
- stateListener.moveToState(QueryState.COMPLETED, null);
+ if (stateChanged) {
+ // since we're in the fragment done clause and this was a change from previous
+ final NodeTracker node = nodeMap.get(status.getProfile().getEndpoint());
+ node.fragmentComplete();
+ finishedFragments.incrementAndGet();
}
}
@@ -201,9 +153,6 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
}
minorMap.put(minorFragmentId, fragmentData);
fragmentDataSet.add(fragmentData);
-
- // keep track of all the drill bits that are used by this query
- includedBits.add(fragmentData.getEndpoint());
}
public String getFragmentStatesAsString() {
@@ -211,7 +160,16 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
}
void addFragmentStatusTracker(final PlanFragment fragment, final boolean isRoot) {
- addFragment(new FragmentData(fragment.getHandle(), fragment.getAssignment(), isRoot));
+ final DrillbitEndpoint assignment = fragment.getAssignment();
+
+ NodeTracker tracker = nodeMap.get(assignment);
+ if (tracker == null) {
+ tracker = new NodeTracker(assignment);
+ nodeMap.put(assignment, tracker);
+ }
+
+ tracker.addFragment();
+ addFragment(new FragmentData(fragment.getHandle(), assignment, isRoot));
}
/**
@@ -219,23 +177,23 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
*/
void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
final Controller controller = drillbitContext.getController();
- for(FragmentData data : fragmentDataSet) {
- final FragmentStatus fragmentStatus = data.getStatus();
- switch(fragmentStatus.getProfile().getState()) {
+ for(final FragmentData data : fragmentDataSet) {
+ switch(data.getState()) {
case SENDING:
case AWAITING_ALLOCATION:
case RUNNING:
- if (rootRunner != null) {
+ if (rootRunner.getContext().getHandle().equals(data.getHandle())) {
rootRunner.cancel();
} else {
final DrillbitEndpoint endpoint = data.getEndpoint();
- final FragmentHandle handle = fragmentStatus.getHandle();
+ final FragmentHandle handle = data.getHandle();
// TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
controller.getTunnel(endpoint).cancelFragment(new CancelListener(endpoint, handle), handle);
}
break;
case FINISHED:
+ case CANCELLATION_REQUESTED:
case CANCELLED:
case FAILED:
// nothing to do
@@ -267,21 +225,6 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
}
}
- public RootStatusReporter getRootStatusHandler(final FragmentContext context) {
- return new RootStatusReporter(context);
- }
-
- class RootStatusReporter extends AbstractStatusReporter {
- private RootStatusReporter(final FragmentContext context) {
- super(context);
- }
-
- @Override
- protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
- statusUpdate(status);
- }
- }
-
QueryState updateQueryStateInStore(final QueryState queryState) {
switch (queryState) {
case PENDING:
@@ -295,7 +238,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
case FAILED:
try {
profileEStore.delete(stringQueryId);
- } catch(Exception e) {
+ } catch(final Exception e) {
logger.warn("Failure while trying to delete the estore profile for this query.", e);
}
@@ -345,7 +288,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
for (int v = 0; v < minorMap.allocated.length; v++) {
if (minorMap.allocated[v]) {
final FragmentData data = (FragmentData) ((Object[]) minorMap.values)[v];
- fb.addMinorFragmentProfile(data.getStatus().getProfile());
+ fb.addMinorFragmentProfile(data.getProfile());
}
}
profileBuilder.addFragmentProfile(fb);
@@ -366,4 +309,159 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
void markEndTime() {
endTime = System.currentTimeMillis();
}
+
+ /**
+ * Internal class used to track the number of pending completion messages required from particular node. This allows
+ * to know for each node that is part of this query, what portion of fragments are still outstanding. In the case that
+ * there is a node failure, we can then correctly track how many outstanding messages will never arrive.
+ */
+ private class NodeTracker {
+ private final DrillbitEndpoint endpoint;
+ private final AtomicInteger totalFragments = new AtomicInteger(0);
+ private final AtomicInteger completedFragments = new AtomicInteger(0);
+
+ public NodeTracker(final DrillbitEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ /**
+ * Increments the number of fragment this node is running.
+ */
+ public void addFragment() {
+ totalFragments.incrementAndGet();
+ }
+
+ /**
+ * Increments the number of fragments completed on this node. Once the number of fragments completed
+ * equals the number of fragments running, this will be marked as a finished node and result in the finishedNodes being incremented.
+ *
+ * If the number of remaining nodes has been decremented to zero, this will allow the query to move to a completed state.
+ */
+ public void fragmentComplete() {
+ if (totalFragments.get() == completedFragments.incrementAndGet()) {
+ nodeComplete();
+ }
+ }
+
+ /**
+ * Increments the number of fragments completed on this node until we mark this node complete. Note that this uses
+ * the internal fragmentComplete() method so whether we have failure or success, the nodeComplete event will only
+ * occur once. (Two threads could be decrementing the fragment at the same time since this will likely come from an
+ * external event).
+ */
+ public void nodeDead() {
+ while (completedFragments.get() < totalFragments.get()) {
+ fragmentComplete();
+ }
+ }
+
+ }
+
+ /**
+ * Increments the number of currently complete nodes and returns the number of completed nodes. If the there are no
+ * more pending nodes, moves the query to a terminal state.
+ */
+ private void nodeComplete() {
+ final int finishedNodes = this.finishedNodes.incrementAndGet();
+ final int totalNodes = nodeMap.size();
+ Preconditions.checkArgument(finishedNodes <= totalNodes, "The finished node count exceeds the total node count");
+ final int remaining = totalNodes - finishedNodes;
+ if (remaining == 0) {
+ // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
+ stateListener.moveToState(QueryState.COMPLETED, null);
+ } else {
+ logger.debug("Foreman is still waiting for completion message from {} nodes containing {} fragments", remaining,
+ this.fragmentDataSet.size() - finishedFragments.get());
+ }
+ }
+
+ public StatusReporter newRootStatusHandler(final FragmentContext context) {
+ return new RootStatusReporter(context);
+ }
+
+ private class RootStatusReporter extends AbstractStatusReporter {
+ private RootStatusReporter(final FragmentContext context) {
+ super(context);
+ }
+
+ @Override
+ protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
+ fragmentStatusListener.statusUpdate(status);
+ }
+ }
+
+ public FragmentStatusListener getFragmentStatusListener(){
+ return fragmentStatusListener;
+ }
+
+ private final FragmentStatusListener fragmentStatusListener = new FragmentStatusListener() {
+ @Override
+ public void statusUpdate(final FragmentStatus status) {
+ logger.debug("New fragment status was provided to QueryManager of {}", status);
+ switch(status.getProfile().getState()) {
+ case AWAITING_ALLOCATION:
+ case RUNNING:
+ case CANCELLATION_REQUESTED:
+ updateFragmentStatus(status);
+ break;
+
+ case FAILED:
+ stateListener.moveToState(QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
+ // fall-through.
+ case FINISHED:
+ case CANCELLED:
+ fragmentDone(status);
+ break;
+
+ default:
+ throw new UnsupportedOperationException(String.format("Received status of %s", status));
+ }
+ }
+ };
+
+
+ public DrillbitStatusListener getDrillbitStatusListener() {
+ return drillbitStatusListener;
+ }
+
+ private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener(){
+
+ @Override
+ public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
+ }
+
+ @Override
+ public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbits) {
+ final StringBuilder failedNodeList = new StringBuilder();
+ boolean atLeastOneFailure = false;
+
+ for(final DrillbitEndpoint ep : unregisteredDrillbits) {
+ final NodeTracker tracker = nodeMap.get(ep);
+ if (tracker != null) {
+ // mark node as dead.
+ tracker.nodeDead();
+
+ // capture node name for exception or logging message
+ if (atLeastOneFailure) {
+ failedNodeList.append(", ");
+ }else{
+ atLeastOneFailure = true;
+ }
+ failedNodeList.append(ep.getAddress());
+ failedNodeList.append(":");
+ failedNodeList.append(ep.getUserPort());
+
+ }
+ }
+
+ if (!atLeastOneFailure) {
+ logger.warn("Drillbits [{}] no longer registered in cluster. Canceling query {}",
+ failedNodeList, QueryIdHelper.getQueryId(queryId));
+ stateListener.moveToState(QueryState.FAILED,
+ new ForemanException(String.format("One more more nodes lost connectivity during query. Identified nodes were [%s].",
+ failedNodeList)));
+ }
+
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
index 4ff28f3..8a40f1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
@@ -24,29 +24,29 @@ import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.FragmentState;
import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
public abstract class AbstractStatusReporter implements StatusReporter{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStatusReporter.class);
- private FragmentContext context;
- private volatile long startNanos;
+ private final FragmentContext context;
- public AbstractStatusReporter(FragmentContext context) {
+ public AbstractStatusReporter(final FragmentContext context) {
super();
this.context = context;
}
- private FragmentStatus.Builder getBuilder(FragmentState state){
+ private FragmentStatus.Builder getBuilder(final FragmentState state){
return getBuilder(context, state, null);
}
- public static FragmentStatus.Builder getBuilder(FragmentContext context, FragmentState state, UserException ex){
- FragmentStatus.Builder status = FragmentStatus.newBuilder();
- MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder();
+ public static FragmentStatus.Builder getBuilder(final FragmentContext context, final FragmentState state, final UserException ex){
+ final FragmentStatus.Builder status = FragmentStatus.newBuilder();
+ final MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder();
context.getStats().addMetricsToStatus(b);
b.setState(state);
if(ex != null){
- boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
+ final boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
b.setError(ex.getOrCreatePBError(verbose));
}
status.setHandle(context.getHandle());
@@ -57,60 +57,36 @@ public abstract class AbstractStatusReporter implements StatusReporter{
}
@Override
- public void stateChanged(FragmentHandle handle, FragmentState newState) {
- FragmentStatus.Builder status = getBuilder(newState);
-
+ public void stateChanged(final FragmentHandle handle, final FragmentState newState) {
+ final FragmentStatus.Builder status = getBuilder(newState);
+ logger.info("State changed for {}. New state: {}", QueryIdHelper.getQueryIdentifier(handle), newState);
switch(newState){
case AWAITING_ALLOCATION:
- awaitingAllocation(handle, status);
- break;
+ case CANCELLATION_REQUESTED:
case CANCELLED:
- cancelled(handle, status);
- break;
- case FAILED:
- // no op since fail should have also been called.
- break;
case FINISHED:
- finished(handle, status);
- break;
case RUNNING:
- this.startNanos = System.nanoTime();
- running(handle, status);
+ statusChange(handle, status.build());
break;
case SENDING:
// no op.
break;
+ case FAILED:
+ // shouldn't get here since fail() should be called.
default:
- break;
-
+ throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.", newState));
}
}
- protected void awaitingAllocation(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
- statusChange(handle, statusBuilder.build());
- }
-
- protected void running(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
- statusChange(handle, statusBuilder.build());
- }
-
- protected void cancelled(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
- statusChange(handle, statusBuilder.build());
- }
-
- protected void finished(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
- statusChange(handle, statusBuilder.build());
- }
-
protected abstract void statusChange(FragmentHandle handle, FragmentStatus status);
@Override
- public final void fail(FragmentHandle handle, String message, UserException excep) {
- FragmentStatus.Builder status = getBuilder(context, FragmentState.FAILED, excep);
+ public final void fail(final FragmentHandle handle, final UserException excep) {
+ final FragmentStatus.Builder status = getBuilder(context, FragmentState.FAILED, excep);
fail(handle, status);
}
- protected void fail(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+ private void fail(final FragmentHandle handle, final FragmentStatus.Builder statusBuilder) {
statusChange(handle, statusBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index a4a97c9..3570ba5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -18,12 +18,14 @@
package org.apache.drill.exec.work.fragment;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.drill.common.DeferredException;
+import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContext.ExecutorState;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.RootExec;
@@ -42,36 +44,42 @@ import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
public class FragmentExecutor implements Runnable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
- // TODO: REVIEW: Can't this be AtomicReference<FragmentState> (so that
- // debugging and logging don't show just integer values--and for type safety)?
- private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
+ private final String fragmentName;
private final FragmentRoot rootOperator;
private final FragmentContext fragmentContext;
private final StatusReporter listener;
- private volatile boolean canceled;
- private volatile boolean closed;
- private RootExec root;
+ private final DeferredException deferredException = new DeferredException();
+ private volatile RootExec root;
+ private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
+ private final ExtendedLatch acceptExternalEvents = new ExtendedLatch();
public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
final StatusReporter listener) {
this.fragmentContext = context;
this.rootOperator = rootOperator;
this.listener = listener;
+ this.fragmentName = QueryIdHelper.getQueryIdentifier(context.getHandle());
+
+ context.setExecutorState(new ExecutorStateImpl());
}
@Override
public String toString() {
- return
- super.toString()
- + "[closed = " + closed
- + ", state = " + state
- + ", rootOperator = " + rootOperator
- + ", fragmentContext = " + fragmentContext
- + ", listener = " + listener
- + "]";
+ final StringBuilder builder = new StringBuilder();
+ builder.append("FragmentExecutor [fragmentContext=");
+ builder.append(fragmentContext);
+ builder.append(", fragmentState=");
+ builder.append(fragmentState);
+ builder.append("]");
+ return builder.toString();
}
+ /**
+ * Returns the current fragment status if the fragment is running. Otherwise, returns no status.
+ *
+ * @return FragmentStatus or null.
+ */
public FragmentStatus getStatus() {
/*
* If the query is not in a running state, the operator tree is still being constructed and
@@ -81,29 +89,46 @@ public class FragmentExecutor implements Runnable {
* before this check. This caused a concurrent modification exception as the list of operator
* stats is iterated over while collecting info, and added to while building the operator tree.
*/
- if(state.get() != FragmentState.RUNNING_VALUE) {
+ if (fragmentState.get() != FragmentState.RUNNING) {
return null;
}
- return AbstractStatusReporter.getBuilder(fragmentContext, FragmentState.RUNNING, null).build();
+ final FragmentStatus status = AbstractStatusReporter
+ .getBuilder(fragmentContext, FragmentState.RUNNING, null)
+ .build();
+ return status;
}
+ /**
+ * Cancel the execution of this fragment is in an appropriate state. Messages come from external.
+ */
public void cancel() {
+ acceptExternalEvents.awaitUninterruptibly();
+
/*
- * Note that this can be called from threads *other* than the one running this runnable(), so
- * we need to be careful about the state transitions that can result. We set the canceled flag,
- * and this is checked in the run() loop, where action will be taken as soon as possible.
- *
- * If the run loop has already exited, because we've already either completed or failed the query,
- * then the request to cancel is a no-op anyway, so it doesn't matter that we won't see the flag.
+ * Note that this can be called from threads *other* than the one running this runnable(), so we need to be careful
+ * about the state transitions that can result. We set the cancel requested flag but the actual cancellation is
+ * managed by the run() loop.
*/
- canceled = true;
+ updateState(FragmentState.CANCELLATION_REQUESTED);
}
- public void receivingFragmentFinished(FragmentHandle handle) {
- cancel();
+ /**
+ * Inform this fragment that one of its downstream partners no longer needs additional records. This is most commonly
+ * called in the case that a limit query is executed.
+ *
+ * @param handle
+ * The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
+ */
+ public void receivingFragmentFinished(final FragmentHandle handle) {
+ acceptExternalEvents.awaitUninterruptibly();
if (root != null) {
+ logger.info("Applying request for early sender termination for {} -> {}.",
+ QueryIdHelper.getFragmentId(this.getContext().getHandle()), QueryIdHelper.getFragmentId(handle));
root.receivingFragmentFinished(handle);
+ } else {
+ logger.warn("Dropping request for early fragment termination for path {} -> {} as no root exec exists.",
+ QueryIdHelper.getFragmentId(this.getContext().getHandle()), QueryIdHelper.getFragmentId(handle));
}
}
@@ -114,184 +139,204 @@ public class FragmentExecutor implements Runnable {
final FragmentHandle fragmentHandle = fragmentContext.getHandle();
final ClusterCoordinator clusterCoordinator = fragmentContext.getDrillbitContext().getClusterCoordinator();
final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
+ final String newThreadName = QueryIdHelper.getExecutorThreadName(fragmentHandle);
try {
- final String newThreadName = String.format("%s:frag:%s:%s",
- QueryIdHelper.getQueryId(fragmentHandle.getQueryId()),
- fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
+
myThread.setName(newThreadName);
root = ImplCreator.getExec(fragmentContext, rootOperator);
+
clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
+ updateState(FragmentState.RUNNING);
+
+ acceptExternalEvents.countDown();
logger.debug("Starting fragment runner. {}:{}",
fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
- if (!updateStateOrFail(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING)) {
- logger.warn("Unable to set fragment state to RUNNING. Cancelled or failed?");
- return;
- }
/*
- * Run the query until root.next returns false OR cancel() changes the
- * state.
- * Note that we closeOutResources() here if we're done. That's because
- * this can also throw exceptions that we want to treat as failures of the
- * request, even if the request did fine up until this point. Any
- * failures there will be caught in the catch clause below, which will be
- * reported to the user. If they were to come from the finally clause,
- * the uncaught exception there will simply terminate this thread without
- * alerting the user--the behavior then is to hang.
+ * Run the query until root.next returns false OR we no longer need to continue.
*/
- while (state.get() == FragmentState.RUNNING_VALUE) {
- if (canceled) {
- logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
-
- // Change state checked by main loop to terminate it (if not already done):
- updateState(FragmentState.CANCELLED);
-
- fragmentContext.cancel();
-
- logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
-
- /*
- * The state will be altered because of the updateState(), which would cause
- * us to fall out of the enclosing while loop; we just short-circuit that here
- */
- break;
- }
-
- if (!root.next()) {
- if (fragmentContext.isFailed()) {
- internalFail(fragmentContext.getFailureCause());
- closeOutResources();
- } else {
- /*
- * Close out resources before we report success. We do this so that we'll get an
- * error if there's a problem cleaning up, even though the query execution portion
- * succeeded.
- */
- closeOutResources();
- updateStateOrFail(FragmentState.RUNNING, FragmentState.FINISHED);
- }
- break;
- }
+ while (shouldContinue() && root.next()) {
+ // loop
}
+
+ updateState(FragmentState.FINISHED);
+
} catch (AssertionError | Exception e) {
- logger.warn("Error while initializing or executing fragment", e);
- fragmentContext.fail(e);
- internalFail(e);
+ fail(e);
} finally {
- clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
- // Final check to make sure RecordBatches are cleaned up.
+ // We need to sure we countDown at least once. We'll do it here to guarantee that.
+ acceptExternalEvents.countDown();
+
closeOutResources();
+ // send the final state of the fragment. only the main execution thread can send the final state and it can
+ // only be sent once.
+ sendFinalState();
+
+ clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
+
myThread.setName(originalThreadName);
}
}
- private static final String CLOSE_FAILURE = "Failure while closing out resources";
-
- private void closeOutResources() {
- /*
- * Because of the way this method can be called, it needs to be idempotent; it must
- * be safe to call it more than once. We use this flag to bypass the body if it has
- * been called before.
- */
- synchronized(this) { // synchronize for the state of closed
- if (closed) {
- return;
- }
+ /**
+ * Utility method to check where we are in a no terminal state.
+ *
+ * @return Whether or not execution should continue.
+ */
+ private boolean shouldContinue() {
+ return !isCompleted() && FragmentState.CANCELLATION_REQUESTED != fragmentState.get();
+ }
- final DeferredException deferredException = fragmentContext.getDeferredException();
- try {
- root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
- } catch (RuntimeException e) {
- logger.warn(CLOSE_FAILURE, e);
- deferredException.addException(e);
- }
+ /**
+ * Returns true if the fragment is in a terminal state
+ *
+ * @return Whether this state is in a terminal state.
+ */
+ private boolean isCompleted() {
+ return isTerminal(fragmentState.get());
+ }
- closed = true;
+ private void sendFinalState() {
+ final FragmentState outcome = fragmentState.get();
+ if (outcome == FragmentState.FAILED) {
+ final FragmentHandle handle = getContext().getHandle();
+ final UserException uex = UserException.systemError(deferredException.getAndClear())
+ .addIdentity(getContext().getIdentity())
+ .addContext("Fragment", handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId())
+ .build();
+ listener.fail(fragmentContext.getHandle(), uex);
+ } else {
+ listener.stateChanged(fragmentContext.getHandle(), outcome);
}
+ }
+
+
+ private void closeOutResources() {
- /*
- * This must be last, because this may throw deferred exceptions.
- * We are forced to wrap the checked exception (if any) so that it will be unchecked.
- */
try {
- fragmentContext.close();
- } catch(Exception e) {
- throw new RuntimeException("Error closing fragment context.", e);
+ root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
+ } catch (final Exception e) {
+ fail(e);
}
- }
- private void internalFail(final Throwable excep) {
- state.set(FragmentState.FAILED_VALUE);
+ fragmentContext.close();
- UserException uex = UserException.systemError(excep).addIdentity(getContext().getIdentity()).build();
- listener.fail(fragmentContext.getHandle(), "Failure while running fragment.", uex);
}
- /**
- * Updates the fragment state with the given state
- *
- * @param to target state
- */
- private void updateState(final FragmentState to) {
- state.set(to.getNumber());
- listener.stateChanged(fragmentContext.getHandle(), to);
+ private void warnStateChange(final FragmentState current, final FragmentState target) {
+ logger.warn("Ignoring unexpected state transition {} => {}.", current.name(), target.name());
}
- /**
- * Updates the fragment state only iff the current state matches the expected.
- *
- * @param expected expected current state
- * @param to target state
- * @return true only if update succeeds
- */
- private boolean checkAndUpdateState(final FragmentState expected, final FragmentState to) {
- final boolean success = state.compareAndSet(expected.getNumber(), to.getNumber());
- if (success) {
- listener.stateChanged(fragmentContext.getHandle(), to);
- } else {
- logger.debug("State change failed. Expected state: {} -- target state: {} -- current state: {}.",
- expected.name(), to.name(), FragmentState.valueOf(state.get()));
+ private void errorStateChange(final FragmentState current, final FragmentState target) {
+ final String msg = "Invalid state transition %s => %s.";
+ throw new StateTransitionException(String.format(msg, current.name(), target.name()));
+ }
+
+ private synchronized boolean updateState(FragmentState target) {
+ final FragmentHandle handle = fragmentContext.getHandle();
+ final FragmentState current = fragmentState.get();
+ logger.info(fragmentName + ": State change requested from {} --> {} for ", current, target);
+ switch (target) {
+ case CANCELLATION_REQUESTED:
+ switch (current) {
+ case SENDING:
+ case AWAITING_ALLOCATION:
+ case RUNNING:
+ fragmentState.set(target);
+ listener.stateChanged(handle, target);
+ return true;
+
+ default:
+ warnStateChange(current, target);
+ return false;
+ }
+
+ case FINISHED:
+ if(current == FragmentState.CANCELLATION_REQUESTED){
+ target = FragmentState.CANCELLED;
+ }
+ // fall-through
+ case FAILED:
+ if(!isTerminal(current)){
+ fragmentState.set(target);
+ // don't notify listener until we finalize this terminal state.
+ return true;
+ } else if (current == FragmentState.FAILED) {
+ // no warn since we can call fail multiple times.
+ return false;
+ } else if (current == FragmentState.CANCELLED && target == FragmentState.FAILED) {
+ fragmentState.set(FragmentState.FAILED);
+ return true;
+ }else{
+ warnStateChange(current, target);
+ return false;
+ }
+
+ case RUNNING:
+ if(current == FragmentState.AWAITING_ALLOCATION){
+ fragmentState.set(target);
+ listener.stateChanged(handle, target);
+ return true;
+ }else{
+ errorStateChange(current, target);
+ }
+
+ // these should never be requested.
+ case SENDING:
+ case AWAITING_ALLOCATION:
+ case CANCELLED:
+ default:
+ errorStateChange(current, target);
}
- return success;
+
+ // errorStateChange() throw should mean this is never executed
+ throw new IllegalStateException();
}
- /**
- * Returns true if the fragment is in a terminal state
- */
- private boolean isCompleted() {
- return state.get() == FragmentState.CANCELLED_VALUE
- || state.get() == FragmentState.FAILED_VALUE
- || state.get() == FragmentState.FINISHED_VALUE;
+ private boolean isTerminal(final FragmentState state) {
+ return state == FragmentState.CANCELLED
+ || state == FragmentState.FAILED
+ || state == FragmentState.FINISHED;
}
/**
- * Update the state if current state matches expected or fail the fragment if state transition fails even though
- * fragment is not in a terminal state.
+ * Capture an exception and add store it. Update state to failed status (if not already there). Does not immediately
+ * report status back to Foreman. Only the original thread can return status to the Foreman.
*
- * @param expected current expected state
- * @param to target state
- * @return true only if update succeeds
+ * @param excep
+ * The failure that occurred.
*/
- private boolean updateStateOrFail(final FragmentState expected, final FragmentState to) {
- final boolean updated = checkAndUpdateState(expected, to);
- if (!updated && !isCompleted()) {
- final String msg = "State was different than expected while attempting to update state from %s to %s"
- + "however current state was %s.";
- internalFail(new StateTransitionException(
- String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
- }
- return updated;
+ private void fail(final Throwable excep) {
+ deferredException.addThrowable(excep);
+ updateState(FragmentState.FAILED);
}
public FragmentContext getContext() {
return fragmentContext;
}
+ private class ExecutorStateImpl implements ExecutorState {
+ public boolean shouldContinue() {
+ return FragmentExecutor.this.shouldContinue();
+ }
+
+ public void fail(final Throwable t) {
+ FragmentExecutor.this.fail(t);
+ }
+
+ public boolean isFailed() {
+ return fragmentState.get() == FragmentState.FAILED;
+ }
+ public Throwable getFailureCause(){
+ return deferredException.getException();
+ }
+ }
+
private class FragmentDrillbitStatusListener implements DrillbitStatusListener {
@Override
public void drillbitRegistered(final Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) {
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index 7a819c4..0ba91b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -17,14 +17,14 @@
*/
package org.apache.drill.exec.work.fragment;
+import java.io.IOException;
+
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.RemoteConnection;
-import java.io.IOException;
-
/**
* The Fragment Manager is responsible managing incoming data and executing a fragment. Once enough data and resources
* are avialable, a fragment manager will start a fragment executor to run the associated fragment.
@@ -57,6 +57,8 @@ public interface FragmentManager {
public abstract void addConnection(RemoteConnection connection);
+ public void receivingFragmentFinished(final FragmentHandle handle);
+
/**
* Sets autoRead property on all connections
* @param autoRead
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 41e87cd..a5b928b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -33,19 +33,23 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.batch.IncomingBuffers;
import org.apache.drill.exec.work.foreman.ForemanException;
+import com.google.common.base.Preconditions;
+
/**
* This managers determines when to run a non-root fragment node.
*/
// TODO a lot of this is the same as RootFragmentManager
public class NonRootFragmentManager implements FragmentManager {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NonRootFragmentManager.class);
+
private final PlanFragment fragment;
private FragmentRoot root;
private final IncomingBuffers buffers;
- private final StatusReporter runnerListener;
- private volatile FragmentExecutor runner;
+ private final FragmentExecutor runner;
private volatile boolean cancel = false;
private final FragmentContext context;
- private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
+ private final List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
+ private volatile boolean runnerRetrieved = false;
public NonRootFragmentManager(final PlanFragment fragment, final DrillbitContext context)
throws ExecutionSetupException {
@@ -54,8 +58,10 @@ public class NonRootFragmentManager implements FragmentManager {
this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry());
this.buffers = new IncomingBuffers(root, this.context);
+ final StatusReporter reporter = new NonRootStatusReporter(this.context, context.getController().getTunnel(
+ fragment.getForeman()));
+ this.runner = new FragmentExecutor(this.context, root, reporter);
this.context.setBuffers(buffers);
- this.runnerListener = new NonRootStatusReporter(this.context, context.getController().getTunnel(fragment.getForeman()));
} catch (ForemanException | IOException e) {
throw new FragmentSetupException("Failure while decoding fragment.", e);
@@ -66,7 +72,7 @@ public class NonRootFragmentManager implements FragmentManager {
* @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle, org.apache.drill.exec.record.RawFragmentBatch)
*/
@Override
- public boolean handle(RawFragmentBatch batch) throws FragmentSetupException, IOException {
+ public boolean handle(final RawFragmentBatch batch) throws FragmentSetupException, IOException {
return buffers.batchArrived(batch);
}
@@ -76,16 +82,22 @@ public class NonRootFragmentManager implements FragmentManager {
@Override
public FragmentExecutor getRunnable() {
synchronized(this) {
- if (runner != null) {
- throw new IllegalStateException("Get Runnable can only be run once.");
- }
+
+ // historically, we had issues where we tried to run the same fragment multiple times. Let's check to make sure
+ // this isn't happening.
+ Preconditions.checkArgument(!runnerRetrieved, "Get Runnable can only be run once.");
+
if (cancel) {
return null;
}
- runner = new FragmentExecutor(context, root, runnerListener);
+ runnerRetrieved = true;
return runner;
}
+ }
+ @Override
+ public void receivingFragmentFinished(final FragmentHandle handle) {
+ runner.receivingFragmentFinished(handle);
}
/* (non-Javadoc)
@@ -95,9 +107,7 @@ public class NonRootFragmentManager implements FragmentManager {
public void cancel() {
synchronized(this) {
cancel = true;
- if (runner != null) {
- runner.cancel();
- }
+ runner.cancel();
}
}
@@ -117,13 +127,13 @@ public class NonRootFragmentManager implements FragmentManager {
}
@Override
- public void addConnection(RemoteConnection connection) {
+ public void addConnection(final RemoteConnection connection) {
connections.add(connection);
}
@Override
- public void setAutoRead(boolean autoRead) {
- for (RemoteConnection c : connections) {
+ public void setAutoRead(final boolean autoRead) {
+ for (final RemoteConnection c : connections) {
c.setAutoRead(autoRead);
}
}
[2/7] drill git commit: DRILL-2803: Fix compound hash function to use
seed value for hash if additional value is null.
Posted by ja...@apache.org.
DRILL-2803: Fix compound hash function to use seed value for hash if additional value is null.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/071ed89b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/071ed89b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/071ed89b
Branch: refs/heads/master
Commit: 071ed89b10b3b5b49ac0286f4717d74431e1efb3
Parents: cf15546
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed Apr 15 19:45:56 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 18 09:11:12 2015 -0700
----------------------------------------------------------------------
.../expr/fn/impl/Hash64FunctionsWithSeed.java | 32 ++++++++++----------
.../expr/fn/impl/Hash64WithSeedAsDouble.java | 18 +++++------
2 files changed, 25 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/071ed89b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64FunctionsWithSeed.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64FunctionsWithSeed.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64FunctionsWithSeed.java
index 3a1edd3..e162b94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64FunctionsWithSeed.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64FunctionsWithSeed.java
@@ -19,9 +19,9 @@ package org.apache.drill.exec.expr.fn.impl;
import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
-import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.holders.BigIntHolder;
import org.apache.drill.exec.expr.holders.BitHolder;
import org.apache.drill.exec.expr.holders.DateHolder;
@@ -70,7 +70,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
}
@@ -106,7 +106,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
}
@@ -142,7 +142,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.start, in.end, in.buffer, seed.value);
}
@@ -162,7 +162,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.start, in.end, in.buffer, seed.value);
}
@@ -182,7 +182,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.start, in.end, in.buffer, seed.value);
}
@@ -202,7 +202,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
}
else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
@@ -222,7 +222,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
}
else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
@@ -336,7 +336,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
}
@@ -370,7 +370,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
}
@@ -404,7 +404,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
}
@@ -438,7 +438,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
}
@@ -472,7 +472,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
}
@@ -506,7 +506,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.start, in.start + NullableDecimal28SparseHolder.WIDTH, in.buffer, seed.value);
}
@@ -540,7 +540,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.start, in.start + NullableDecimal38SparseHolder.WIDTH, in.buffer, seed.value);
}
@@ -560,7 +560,7 @@ public class Hash64FunctionsWithSeed {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/071ed89b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64WithSeedAsDouble.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64WithSeedAsDouble.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64WithSeedAsDouble.java
index 0cbac1b..6922c31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64WithSeedAsDouble.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash64WithSeedAsDouble.java
@@ -19,9 +19,9 @@ package org.apache.drill.exec.expr.fn.impl;
import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
-import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.holders.BigIntHolder;
import org.apache.drill.exec.expr.holders.Decimal18Holder;
import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
@@ -61,7 +61,7 @@ public class Hash64WithSeedAsDouble {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64((double) in.value, seed.value);
}
@@ -97,7 +97,7 @@ public class Hash64WithSeedAsDouble {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(in.value, seed.value);
}
@@ -133,7 +133,7 @@ public class Hash64WithSeedAsDouble {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
}
else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64((double) in.value, seed.value);
@@ -153,7 +153,7 @@ public class Hash64WithSeedAsDouble {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
}
else {
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64((double) in.value, seed.value);
@@ -221,7 +221,7 @@ public class Hash64WithSeedAsDouble {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
java.math.BigDecimal input = new java.math.BigDecimal(java.math.BigInteger.valueOf(in.value), in.scale);
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(input.doubleValue(), seed.value);
@@ -257,7 +257,7 @@ public class Hash64WithSeedAsDouble {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
java.math.BigDecimal input = new java.math.BigDecimal(java.math.BigInteger.valueOf(in.value), in.scale);
out.value = org.apache.drill.exec.expr.fn.impl.XXHash.hash64(input.doubleValue(), seed.value);
@@ -294,7 +294,7 @@ public class Hash64WithSeedAsDouble {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
java.math.BigDecimal input = org.apache.drill.exec.util.DecimalUtility.getBigDecimalFromSparse(in.buffer,
in.start, in.nDecimalDigits, in.scale);
@@ -332,7 +332,7 @@ public class Hash64WithSeedAsDouble {
public void eval() {
if (in.isSet == 0) {
- out.value = 0;
+ out.value = seed.value;
} else {
java.math.BigDecimal input = org.apache.drill.exec.util.DecimalUtility.getBigDecimalFromSparse(in.buffer,
in.start, in.nDecimalDigits, in.scale);
[7/7] drill git commit: DRILL-2825: Bump test memory to 3g due to
sporadic OOM on TestLargeFileCompilation.
Posted by ja...@apache.org.
DRILL-2825: Bump test memory to 3g due to sporadic OOM on TestLargeFileCompilation.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9ec257ef
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9ec257ef
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9ec257ef
Branch: refs/heads/master
Commit: 9ec257efb7992209e27c82e6f4ee8a5b12cc95e4
Parents: c0d5a69
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Apr 18 09:19:48 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 18 18:01:45 2015 -0700
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/9ec257ef/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f6bcd91..3b246d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -365,7 +365,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration>
- <argLine>-Xms512m -Xmx2g -Ddrill.exec.http.enabled=false
+ <argLine>-Xms512m -Xmx3g -Ddrill.exec.http.enabled=false
-Ddrill.exec.sys.store.provider.local.write=false
-Dorg.apache.drill.exec.server.Drillbit.system_options="org.apache.drill.exec.compile.ClassTransformer.scalar_replacement=on"
-XX:MaxPermSize=256M -XX:MaxDirectMemorySize=3072M
[6/7] drill git commit: DRILL-2762: Update Fragment state reporting
and error collection
Posted by ja...@apache.org.
DRILL-2762: Update Fragment state reporting and error collection
DeferredException
- Add new throwAndClear operation on to allow checking for exceptions preClose in FragmentContext
- Add new getAndClear operation
BufferManager
- Ensure close() can be called multiple times by clearing managed buffer list on close().
FragmentContext/FragmentExecutor
- Update FragmentContext to have a preClose so that we can check closure state before doing final close.
- Update so that there is only a single state maintained between FragmentContext and FragmentExecutor
- Clean up FragmentExecutor run() method to better manage error states and have only single terminal point (avoiding multiple messages to Foreman).
- Add new CANCELLATION_REQUESTED state for FragmentState.
- Move all users of isCancelled or isFailed in main code to use shouldContinue()
- Update receivingFragmentFinished message to not cancel fragment (only inform root operator of cancellation)
WorkManager Updates
- Add new afterExecute command to the WorkManager ExecutorService so that we get log entries if a thread leaks an exception. (Otherwise logs don't show these exceptions and they only go to standard out.)
Profile Page
- Update profile page to show last update and last progress.
- Change durations to non-time presentation
Foreman/QueryManager
- Extract listenable interfaces into anonymous inner classes from body of Foreman
QueryManager
- Update QueryManager to track completed nodes rather than completed fragments using NodeTracker
- Update DrillbitStatusListener to decrement expected completion messages on Nodes that have died to avoid query hang when a node dies
FragmentData/MinorFragmentProfile
- Add ability to track last status update as well as last time fragment made progress
AbstractRecordBatch
- Update awareness of current cancellation state to avoid cancellation delays
Misc. Other changes
- Move ByteCode optimization code to only record assembly and code as trace messages
- Update SimpleRootExec to create fake ExecutorState to make existing tests work.
- Update sort to exit prematurely in the case that the fragment was asked to cancel.
- Add finals to all edited files.
- Modify control handler and FragmentManager to directly support receivingFragmentFinished
- Update receiver propagation message to avoid premature removal of fragment manager
- Update UserException.Builder to log a message if we're creating a new UserException (ERROR for System, INFO otherwise).
- Update Profile pages to use min and max instead of sorts.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c0d5a693
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c0d5a693
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c0d5a693
Branch: refs/heads/master
Commit: c0d5a693ac70c42019b5841eea91252f9eaa7792
Parents: 071ed89
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed Apr 8 19:06:03 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 18 17:56:21 2015 -0700
----------------------------------------------------------------------
.../apache/drill/common/DeferredException.java | 47 ++-
.../drill/common/concurrent/ExtendedLatch.java | 89 +++++
.../drill/common/exceptions/UserException.java | 66 ++--
.../org/apache/drill/exec/compile/AsmUtil.java | 28 +-
.../compile/bytecode/InstructionModifier.java | 28 +-
.../compile/bytecode/ScalarReplacementNode.java | 10 +-
.../apache/drill/exec/ops/BufferManager.java | 1 +
.../apache/drill/exec/ops/FragmentContext.java | 143 ++++----
.../drill/exec/physical/impl/BaseRootExec.java | 10 +-
.../impl/mergereceiver/MergingRecordBatch.java | 143 ++++----
.../impl/producer/ProducerConsumerBatch.java | 48 ++-
.../impl/project/ProjectRecordBatch.java | 250 +++++++------
.../UnorderedReceiverBatch.java | 39 +-
.../physical/impl/xsort/ExternalSortBatch.java | 4 +
.../exec/physical/impl/xsort/MSortTemplate.java | 43 ++-
.../drill/exec/proto/helper/QueryIdHelper.java | 23 +-
.../drill/exec/record/AbstractRecordBatch.java | 26 +-
.../exec/record/FragmentWritableBatch.java | 50 ++-
.../exec/rpc/data/DataResponseHandlerImpl.java | 11 +-
.../org/apache/drill/exec/server/Drillbit.java | 16 +-
.../exec/server/rest/profile/Comparators.java | 34 +-
.../server/rest/profile/FragmentWrapper.java | 73 ++--
.../server/rest/profile/OperatorWrapper.java | 26 +-
.../server/rest/profile/ProfileWrapper.java | 49 +--
.../exec/server/rest/profile/TableBuilder.java | 77 ++--
.../org/apache/drill/exec/work/WorkManager.java | 34 +-
.../exec/work/batch/ControlHandlerImpl.java | 20 +-
.../work/batch/UnlimitedRawBatchBuffer.java | 37 +-
.../apache/drill/exec/work/foreman/Foreman.java | 127 +++----
.../drill/exec/work/foreman/FragmentData.java | 77 +++-
.../drill/exec/work/foreman/QueryManager.java | 292 ++++++++++-----
.../work/fragment/AbstractStatusReporter.java | 62 +---
.../exec/work/fragment/FragmentExecutor.java | 357 +++++++++++--------
.../exec/work/fragment/FragmentManager.java | 6 +-
.../work/fragment/NonRootFragmentManager.java | 40 ++-
.../exec/work/fragment/RootFragmentManager.java | 17 +-
.../work/fragment/StateTransitionException.java | 12 +-
.../exec/work/fragment/StatusReporter.java | 2 +-
.../exec/physical/impl/SimpleRootExec.java | 40 ++-
.../work/batch/TestUnlimitedBatchBuffer.java | 4 +-
.../drill/exec/proto/SchemaUserBitShared.java | 14 +
.../apache/drill/exec/proto/UserBitShared.java | 249 +++++++++++--
.../drill/exec/proto/beans/FragmentState.java | 4 +-
.../exec/proto/beans/MinorFragmentProfile.java | 44 +++
protocol/src/main/protobuf/UserBitShared.proto | 3 +
45 files changed, 1718 insertions(+), 1057 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/common/src/main/java/org/apache/drill/common/DeferredException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/DeferredException.java b/common/src/main/java/org/apache/drill/common/DeferredException.java
index 99f18f1..c7111a9 100644
--- a/common/src/main/java/org/apache/drill/common/DeferredException.java
+++ b/common/src/main/java/org/apache/drill/common/DeferredException.java
@@ -71,9 +71,32 @@ public class DeferredException implements AutoCloseable {
*
* @return the deferred exception, or null
*/
- public Exception getException() {
- synchronized(this) {
- return exception;
+ public synchronized Exception getException() {
+ return exception;
+ }
+
+ public synchronized Exception getAndClear() {
+ Preconditions.checkState(!isClosed);
+
+ if (exception != null) {
+ final Exception local = exception;
+ exception = null;
+ return local;
+ }
+
+ return null;
+ }
+
+ /**
+ * If an exception exists, will throw the exception and then clear it. This is so in cases where want to reuse
+ * DeferredException, we don't double report the same exception.
+ *
+ * @throws Exception
+ */
+ public synchronized void throwAndClear() throws Exception{
+ final Exception e = getAndClear();
+ if (e != null) {
+ throw e;
}
}
@@ -98,24 +121,18 @@ public class DeferredException implements AutoCloseable {
try {
autoCloseable.close();
- } catch(Exception e) {
+ } catch(final Exception e) {
addException(e);
}
}
}
@Override
- public void close() throws Exception {
- synchronized(this) {
- Preconditions.checkState(!isClosed);
-
- try {
- if (exception != null) {
- throw exception;
- }
- } finally {
- isClosed = true;
- }
+ public synchronized void close() throws Exception {
+ try {
+ throwAndClear();
+ } finally {
+ isClosed = true;
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
new file mode 100644
index 0000000..a75ac32
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.concurrent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An extended CountDownLatch which allows us to await uninterruptibly.
+ */
+public class ExtendedLatch extends CountDownLatch {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExtendedLatch.class);
+
+ public ExtendedLatch() {
+ super(1);
+ }
+
+ public ExtendedLatch(final int count) {
+ super(count);
+ }
+
+ /**
+ * Returns whether or not interruptions should continue to be ignored. This can be overridden in subclasses to check a
+ * state variable or similar.
+ *
+ * @return Whether awaitUninterruptibly() should continue ignoring interruptions.
+ */
+ protected boolean ignoreInterruptions() {
+ return true;
+ }
+
+ /**
+ * Await without interruption for a given time.
+ * @param waitMillis
+ * Time in milliseconds to wait
+ * @return Whether the countdown reached zero or not.
+ */
+ public boolean awaitUninterruptibly(long waitMillis) {
+ final long targetMillis = System.currentTimeMillis() + waitMillis;
+ while (System.currentTimeMillis() < targetMillis) {
+ final long wait = targetMillis - System.currentTimeMillis();
+ if (wait < 1) {
+ return false;
+ }
+
+ try {
+ return await(wait, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ // if we weren't ready, the while loop will continue to wait
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Await without interruption. In the case of interruption, log a warning and continue to wait. This also checks the
+ * output of ignoreInterruptions();
+ */
+ public void awaitUninterruptibly() {
+ while (true) {
+ try {
+ await();
+ return;
+ } catch (final InterruptedException e) {
+ if (ignoreInterruptions()) {
+ // if we're still not ready, the while loop will cause us to wait again
+ logger.warn("Interrupted while waiting for event latch.", e);
+ } else {
+ return;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index e995346..9283339 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -53,7 +53,7 @@ public class UserException extends DrillRuntimeException {
* Rpc layer or UserResultListener.submitFailed()
*/
@Deprecated
- public static Builder systemError(Throwable cause) {
+ public static Builder systemError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.SYSTEM, cause);
}
@@ -79,7 +79,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder connectionError(Throwable cause) {
+ public static Builder connectionError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.CONNECTION, cause);
}
@@ -105,7 +105,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder dataReadError(Throwable cause) {
+ public static Builder dataReadError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.DATA_READ, cause);
}
@@ -131,7 +131,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder dataWriteError(Throwable cause) {
+ public static Builder dataWriteError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.DATA_WRITE, cause);
}
@@ -157,7 +157,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder functionError(Throwable cause) {
+ public static Builder functionError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.FUNCTION, cause);
}
@@ -183,7 +183,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder parseError(Throwable cause) {
+ public static Builder parseError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.PARSE, cause);
}
@@ -209,7 +209,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder permissionError(Throwable cause) {
+ public static Builder permissionError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.PERMISSION, cause);
}
@@ -235,7 +235,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder planError(Throwable cause) {
+ public static Builder planError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.PLAN, cause);
}
@@ -261,7 +261,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder resourceError(Throwable cause) {
+ public static Builder resourceError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.RESOURCE, cause);
}
@@ -287,7 +287,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder unsupportedError(Throwable cause) {
+ public static Builder unsupportedError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.UNSUPPORTED_OPERATION, cause);
}
@@ -313,7 +313,7 @@ public class UserException extends DrillRuntimeException {
* or doesn't wrap a user exception
* @param cause exception to wrap inside a user exception. Can be null
*/
- private Builder(DrillPBError.ErrorType errorType, Throwable cause) {
+ private Builder(final DrillPBError.ErrorType errorType, final Throwable cause) {
this.cause = cause;
//TODO handle the improbable case where cause is a SYSTEM exception ?
@@ -339,7 +339,7 @@ public class UserException extends DrillRuntimeException {
* @param args Arguments referenced by the format specifiers in the format string
* @return this builder
*/
- public Builder message(String format, Object... args) {
+ public Builder message(final String format, final Object... args) {
// we can't replace the message of a user exception
if (uex == null && format != null) {
this.message = String.format(format, args);
@@ -353,7 +353,7 @@ public class UserException extends DrillRuntimeException {
*
* @param endpoint drillbit endpoint identity
*/
- public Builder addIdentity(CoordinationProtos.DrillbitEndpoint endpoint) {
+ public Builder addIdentity(final CoordinationProtos.DrillbitEndpoint endpoint) {
context.add(endpoint);
return this;
}
@@ -363,7 +363,7 @@ public class UserException extends DrillRuntimeException {
* @param value string line
* @return this builder
*/
- public Builder addContext(String value) {
+ public Builder addContext(final String value) {
context.add(value);
return this;
}
@@ -375,7 +375,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder addContext(String name, String value) {
+ public Builder addContext(final String name, final String value) {
context.add(name, value);
return this;
}
@@ -387,7 +387,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder addContext(String name, long value) {
+ public Builder addContext(final String name, final long value) {
context.add(name, value);
return this;
}
@@ -399,7 +399,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder addContext(String name, double value) {
+ public Builder addContext(final String name, final double value) {
context.add(name, value);
return this;
}
@@ -410,7 +410,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder pushContext(String value) {
+ public Builder pushContext(final String value) {
context.push(value);
return this;
}
@@ -422,7 +422,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder pushContext(String name, String value) {
+ public Builder pushContext(final String name, final String value) {
context.push(name, value);
return this;
}
@@ -434,7 +434,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder pushContext(String name, long value) {
+ public Builder pushContext(final String name, final long value) {
context.push(name, value);
return this;
}
@@ -446,7 +446,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder pushContext(String name, double value) {
+ public Builder pushContext(final String name, final double value) {
context.push(name, value);
return this;
}
@@ -462,7 +462,19 @@ public class UserException extends DrillRuntimeException {
return uex;
}
- return new UserException(this);
+ final UserException newException = new UserException(this);
+
+ // since we just created a new exception, we should log it for later reference. If this is a system error, this is
+ // an issue that the Drill admin should pay attention to and we should log as ERROR. However, if this is a user
+ // mistake or data read issue, the system admin should not be concerned about these and thus we'll log this
+ // as an INFO message.
+ if (errorType == DrillPBError.ErrorType.SYSTEM) {
+ logger.error(newException.getMessage(), newException);
+ } else {
+ logger.info("User Error Occurred", newException);
+ }
+
+ return newException;
}
}
@@ -470,14 +482,14 @@ public class UserException extends DrillRuntimeException {
private final UserExceptionContext context;
- protected UserException(DrillPBError.ErrorType errorType, String message, Throwable cause) {
+ protected UserException(final DrillPBError.ErrorType errorType, final String message, final Throwable cause) {
super(message, cause);
this.errorType = errorType;
this.context = new UserExceptionContext();
}
- private UserException(Builder builder) {
+ private UserException(final Builder builder) {
super(builder.message, builder.cause);
this.errorType = builder.errorType;
this.context = builder.context;
@@ -516,10 +528,10 @@ public class UserException extends DrillRuntimeException {
* @param verbose should the error object contain the verbose error message ?
* @return protobuf error object
*/
- public DrillPBError getOrCreatePBError(boolean verbose) {
- String message = verbose ? getVerboseMessage() : getMessage();
+ public DrillPBError getOrCreatePBError(final boolean verbose) {
+ final String message = verbose ? getVerboseMessage() : getMessage();
- DrillPBError.Builder builder = DrillPBError.newBuilder();
+ final DrillPBError.Builder builder = DrillPBError.newBuilder();
builder.setErrorType(errorType);
builder.setErrorId(context.getErrorId());
if (context.getEndpoint() != null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
index 81904df..5e7a9e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
@@ -54,7 +54,7 @@ public class AsmUtil {
final ClassReader ver = new ClassReader(verifyWriter.toByteArray());
try {
DrillCheckClassAdapter.verify(ver, false, new PrintWriter(sw));
- } catch(Exception e) {
+ } catch(final Exception e) {
logger.info("Caught exception verifying class:");
logClass(logger, logTag, classNode);
throw e;
@@ -98,19 +98,25 @@ public class AsmUtil {
/**
* Write a class to the log.
*
- * <p>Writes at level DEBUG.
+ * <p>
+ * Writes at level TRACE.
*
- * @param logger the logger to write to
- * @param logTag a tag to print to the log
- * @param classNode the class
+ * @param logger
+ * the logger to write to
+ * @param logTag
+ * a tag to print to the log
+ * @param classNode
+ * the class
*/
public static void logClass(final Logger logger, final String logTag, final ClassNode classNode) {
- logger.debug(logTag);
- final StringWriter stringWriter = new StringWriter();
- final PrintWriter printWriter = new PrintWriter(stringWriter);
- final TraceClassVisitor traceClassVisitor = new TraceClassVisitor(printWriter);
- classNode.accept(traceClassVisitor);
- logger.debug(stringWriter.toString());
+ if (logger.isTraceEnabled()) {
+ logger.trace(logTag);
+ final StringWriter stringWriter = new StringWriter();
+ final PrintWriter printWriter = new PrintWriter(stringWriter);
+ final TraceClassVisitor traceClassVisitor = new TraceClassVisitor(printWriter);
+ classNode.accept(traceClassVisitor);
+ logger.trace(stringWriter.toString());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
index 6c0292e..d25b1ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
@@ -51,12 +51,6 @@ public class InstructionModifier extends MethodVisitor {
private int stackIncrease = 0; // how much larger we have to make the stack
- /*
- * True if we've transferred holder members to locals. If so, then the maximum stack
- * size must be increased by one to accommodate the extra DUP that is used to do so.
- */
- private boolean transferredToLocals = false;
-
public InstructionModifier(final int access, final String name, final String desc,
final String signature, final String[] exceptions, final TrackingInstructionList list,
final MethodVisitor inner) {
@@ -116,7 +110,7 @@ public class InstructionModifier extends MethodVisitor {
}
@Override
- public void visitInsn(int opcode) {
+ public void visitInsn(final int opcode) {
switch (opcode) {
case Opcodes.DUP:
/*
@@ -276,7 +270,7 @@ public class InstructionModifier extends MethodVisitor {
}
@Override
- public void visitTypeInsn(int opcode, String type) {
+ public void visitTypeInsn(final int opcode, final String type) {
/*
* This includes NEW, NEWARRAY, CHECKCAST, or INSTANCEOF.
*
@@ -285,9 +279,9 @@ public class InstructionModifier extends MethodVisitor {
* replaced the values for those, but we might find other reasons to replace
* things, in which case this will be too broad.
*/
- ReplacingBasicValue r = getFunctionReturn();
+ final ReplacingBasicValue r = getFunctionReturn();
if (r != null) {
- ValueHolderSub sub = r.getIden().getHolderSub(adder);
+ final ValueHolderSub sub = r.getIden().getHolderSub(adder);
oldToNew.put(r.getIndex(), sub);
} else {
super.visitTypeInsn(opcode, type);
@@ -295,7 +289,7 @@ public class InstructionModifier extends MethodVisitor {
}
@Override
- public void visitLineNumber(int line, Label start) {
+ public void visitLineNumber(final int line, final Label start) {
lastLineNumber = line;
super.visitLineNumber(line, start);
}
@@ -347,7 +341,7 @@ public class InstructionModifier extends MethodVisitor {
@Override
public void visitMaxs(final int maxStack, final int maxLocals) {
- super.visitMaxs(maxStack + stackIncrease + (transferredToLocals ? 1 : 0), maxLocals);
+ super.visitMaxs(maxStack + stackIncrease, maxLocals);
}
@Override
@@ -397,7 +391,7 @@ public class InstructionModifier extends MethodVisitor {
}
@Override
- public void visitMethodInsn(int opcode, String owner, String name, String desc) {
+ public void visitMethodInsn(final int opcode, final String owner, final String name, final String desc) {
/*
* This method was deprecated in the switch from api version ASM4 to ASM5.
* If we ever go back (via CompilationConfig.ASM_API_VERSION), we need to
@@ -408,7 +402,7 @@ public class InstructionModifier extends MethodVisitor {
}
@Override
- public void visitMethodInsn(int opcode, String owner, String name, String desc, boolean itf) {
+ public void visitMethodInsn(final int opcode, final String owner, final String name, final String desc, final boolean itf) {
// this version of visitMethodInsn() came after ASM4
assert CompilationConfig.ASM_API_VERSION != Opcodes.ASM4;
@@ -471,7 +465,7 @@ public class InstructionModifier extends MethodVisitor {
@Override
public void visitEnd() {
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
final StringBuilder sb = new StringBuilder();
sb.append("InstructionModifier ");
sb.append(name);
@@ -488,7 +482,7 @@ public class InstructionModifier extends MethodVisitor {
int itemCount = 0; // counts up the number of items found
final HashMap<ValueHolderIden, Integer> seenIdens = new HashMap<>(); // iden -> idenId
sb.append(" .oldToNew:\n");
- for (IntObjectCursor<ValueHolderIden.ValueHolderSub> ioc : oldToNew) {
+ for (final IntObjectCursor<ValueHolderIden.ValueHolderSub> ioc : oldToNew) {
final ValueHolderIden iden = ioc.value.iden();
if (!seenIdens.containsKey(iden)) {
seenIdens.put(iden, ++idenId);
@@ -501,7 +495,7 @@ public class InstructionModifier extends MethodVisitor {
}
sb.append(" .oldLocalToFirst:\n");
- for (IntIntCursor iic : oldLocalToFirst) {
+ for (final IntIntCursor iic : oldLocalToFirst) {
sb.append(" " + iic.key + " => " + iic.value + '\n');
++itemCount;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
index 6e981bc..adbf2fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
@@ -60,14 +60,14 @@ public class ScalarReplacementNode extends MethodNode {
Frame<BasicValue>[] frames;
try {
frames = analyzer.analyze(className, this);
- } catch (AnalyzerException e) {
+ } catch (final AnalyzerException e) {
throw new IllegalStateException(e);
}
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
final StringBuilder sb = new StringBuilder();
sb.append("ReplacingBasicValues for " + className + "\n");
- for(ReplacingBasicValue value : valueList) {
+ for(final ReplacingBasicValue value : valueList) {
value.dump(sb, 2);
sb.append('\n');
}
@@ -75,14 +75,14 @@ public class ScalarReplacementNode extends MethodNode {
}
// wrap the instruction handler so that we can do additional things
- TrackingInstructionList list = new TrackingInstructionList(frames, this.instructions);
+ final TrackingInstructionList list = new TrackingInstructionList(frames, this.instructions);
this.instructions = list;
MethodVisitor methodVisitor = inner;
if (verifyBytecode) {
methodVisitor = new CheckMethodVisitorFsm(CompilationConfig.ASM_API_VERSION, methodVisitor);
}
- InstructionModifier holderV = new InstructionModifier(this.access, this.name, this.desc,
+ final InstructionModifier holderV = new InstructionModifier(this.access, this.name, this.desc,
this.signature, this.exceptionsArr, list, methodVisitor);
accept(holderV);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
index 2d22d84..c953bb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
@@ -58,6 +58,7 @@ public class BufferManager implements AutoCloseable {
((DrillBuf)mbuffers[i]).release();
}
}
+ managedBuffers.clear();
}
public DrillBuf replace(DrillBuf old, int newSize) {
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 44ca78a..c46613d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.ops;
-import com.google.common.base.Preconditions;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
@@ -27,7 +26,6 @@ import java.util.Map;
import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
-import org.apache.drill.common.DeferredException;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -53,6 +51,8 @@ import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.PartitionExplorer;
import org.apache.drill.exec.work.batch.IncomingBuffers;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
/**
@@ -72,31 +72,19 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
private IncomingBuffers buffers;
private final OptionManager fragmentOptions;
private final BufferManager bufferManager;
+ private ExecutorState executorState;
- private final DeferredException deferredException = new DeferredException();
- private volatile FragmentContextState state = FragmentContextState.OK;
private final SendingAccountor sendingAccountor = new SendingAccountor();
private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
-
@Override
- public void accept(RpcException e) {
+ public void accept(final RpcException e) {
fail(e);
}
};
+
private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
private final AccountingUserConnection accountingUserConnection;
- /*
- * TODO we need a state that indicates that cancellation has been requested and
- * is in progress. Early termination (such as from limit queries) could also use
- * this, as the cleanup steps should be exactly the same.
- */
- private static enum FragmentContextState {
- OK,
- FAILED,
- CANCELED
- }
-
public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
throws ExecutionSetupException {
@@ -111,14 +99,14 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
logger.debug("Fragment max allocation: {}", fragment.getMemMax());
try {
- OptionList list;
+ final OptionList list;
if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
list = new OptionList();
} else {
list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
}
fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ExecutionSetupException("Failure while reading plan options.", e);
}
@@ -127,8 +115,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
try {
allocator = context.getAllocator().getChildAllocator(
this, fragment.getMemInitial(), fragment.getMemMax(), true);
- assert (allocator != null);
- } catch(Throwable e) {
+ Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
+ } catch(final Throwable e) {
throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
}
@@ -140,37 +128,29 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return fragmentOptions;
}
- public void setBuffers(IncomingBuffers buffers) {
+ public void setBuffers(final IncomingBuffers buffers) {
+ Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
this.buffers = buffers;
}
- public void fail(Throwable cause) {
- final FragmentHandle fragmentHandle = fragment.getHandle();
-
- UserException dse = UserException.systemError(cause).addIdentity(getIdentity()).build();
-
- // log the error id
- logger.error("Fragment Context received failure -- Fragment: {}:{}",
- fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(), dse);
-
- setState(FragmentContextState.FAILED);
- deferredException.addThrowable(dse);
+ public void setExecutorState(final ExecutorState executorState) {
+ Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
+ this.executorState = executorState;
}
- public void cancel() {
- setState(FragmentContextState.CANCELED);
+ public void fail(final Throwable cause) {
+ executorState.fail(cause);
}
/**
- * Allowed transitions from left to right: OK -> FAILED -> CANCELED
- * @param newState
+ * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
+ * will mean that the fragment should prematurely exit execution. Long running operations should check this every so
+ * often so that Drill is responsive to cancellation operations.
+ *
+ * @return false if the action should terminate immediately, true if everything is okay.
*/
- private synchronized void setState(FragmentContextState newState) {
- if (state == FragmentContextState.OK) {
- state = newState;
- } else if (newState == FragmentContextState.CANCELED) {
- state = newState;
- }
+ public boolean shouldContinue() {
+ return executorState.shouldContinue();
}
public DrillbitContext getDrillbitContext() {
@@ -224,6 +204,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return frag;
}
+
+
/**
* Get this fragment's allocator.
* @return the allocator
@@ -252,11 +234,11 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return context.getCompiler().getImplementationClass(cg);
}
- public <T> List<T> getImplementationClass(ClassGenerator<T> cg, int instanceCount) throws ClassTransformationException, IOException {
+ public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
return getImplementationClass(cg.getCodeGenerator(), instanceCount);
}
- public <T> List<T> getImplementationClass(CodeGenerator<T> cg, int instanceCount) throws ClassTransformationException, IOException {
+ public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
return context.getCompiler().getImplementationClass(cg, instanceCount);
}
@@ -269,7 +251,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return context.getController().getTunnel(endpoint);
}
- public AccountingDataTunnel getDataTunnel(DrillbitEndpoint endpoint) {
+ public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) {
AccountingDataTunnel tunnel = tunnels.get(endpoint);
if (tunnel == null) {
tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler);
@@ -282,16 +264,16 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return buffers;
}
+ @VisibleForTesting
+ @Deprecated
public Throwable getFailureCause() {
- return deferredException.getException();
+ return executorState.getFailureCause();
}
+ @VisibleForTesting
+ @Deprecated
public boolean isFailed() {
- return state == FragmentContextState.FAILED;
- }
-
- public boolean isCancelled() {
- return state == FragmentContextState.CANCELED;
+ return executorState.isFailed();
}
public FunctionImplementationRegistry getFunctionRegistry() {
@@ -306,24 +288,25 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
allocator.setFragmentLimit(limit);
}
- public DeferredException getDeferredException() {
- return deferredException;
- }
-
@Override
- public void close() throws Exception {
- /*
- * TODO wait for threads working on this Fragment to terminate (or at least stop working
- * on this Fragment's query)
- */
- deferredException.suppressingClose(bufferManager);
- deferredException.suppressingClose(buffers);
- deferredException.suppressingClose(allocator);
+ public void close() {
+ waitForSendComplete();
+ suppressingClose(bufferManager);
+ suppressingClose(buffers);
+ suppressingClose(allocator);
+ }
- deferredException.close(); // must be last, as this may throw
+ private void suppressingClose(final AutoCloseable closeable) {
+ try {
+ if (closeable != null) {
+ closeable.close();
+ }
+ } catch (final Exception e) {
+ fail(e);
+ }
}
- public DrillBuf replace(DrillBuf old, int newSize) {
+ public DrillBuf replace(final DrillBuf old, final int newSize) {
return bufferManager.replace(old, newSize);
}
@@ -332,7 +315,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return bufferManager.getManagedBuffer();
}
- public DrillBuf getManagedBuffer(int size) {
+ public DrillBuf getManagedBuffer(final int size) {
return bufferManager.getManagedBuffer(size);
}
@@ -350,4 +333,30 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
sendingAccountor.waitForSendComplete();
}
+ public interface ExecutorState {
+ /**
+ * Whether execution should continue.
+ *
+ * @return false if execution should stop.
+ */
+ public boolean shouldContinue();
+
+ /**
+ * Inform the executor if a exception occurs and fragment should be failed.
+ *
+ * @param t
+ * The exception that occurred.
+ */
+ public void fail(final Throwable t);
+
+ @VisibleForTesting
+ @Deprecated
+ public boolean isFailed();
+
+ @VisibleForTesting
+ @Deprecated
+ public Throwable getFailureCause();
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 5b7ca66..628dcd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -34,7 +34,7 @@ public abstract class BaseRootExec implements RootExec {
protected OperatorContext oContext = null;
protected FragmentContext fragmentContext = null;
- public BaseRootExec(FragmentContext fragmentContext, PhysicalOperator config) throws OutOfMemoryException {
+ public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
this.oContext = new OperatorContext(config, fragmentContext, stats, true);
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
config.getOperatorType(), OperatorContext.getChildCount(config)),
@@ -43,7 +43,7 @@ public abstract class BaseRootExec implements RootExec {
this.fragmentContext = fragmentContext;
}
- public BaseRootExec(FragmentContext fragmentContext, OperatorContext oContext, PhysicalOperator config) throws OutOfMemoryException {
+ public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext, final PhysicalOperator config) throws OutOfMemoryException {
this.oContext = oContext;
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
config.getOperatorType(), OperatorContext.getChildCount(config)),
@@ -56,7 +56,7 @@ public abstract class BaseRootExec implements RootExec {
public final boolean next() {
// Stats should have been initialized
assert stats != null;
- if (fragmentContext.isFailed()) {
+ if (!fragmentContext.shouldContinue()) {
return false;
}
try {
@@ -67,7 +67,7 @@ public abstract class BaseRootExec implements RootExec {
}
}
- public final IterOutcome next(RecordBatch b){
+ public final IterOutcome next(final RecordBatch b){
stats.stopProcessing();
IterOutcome next;
try {
@@ -90,7 +90,7 @@ public abstract class BaseRootExec implements RootExec {
public abstract boolean innerNext();
@Override
- public void receivingFragmentFinished(FragmentHandle handle) {
+ public void receivingFragmentFinished(final FragmentHandle handle) {
logger.warn("Currently not handling FinishedFragment message");
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index e230fd2..b8ef690 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -40,7 +40,6 @@ import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
@@ -77,7 +76,6 @@ import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
import org.eigenbase.rel.RelFieldCollation.Direction;
-import org.eigenbase.rel.RelFieldCollation.NullDirection;
import parquet.Preconditions;
@@ -97,12 +95,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
private RecordBatchLoader[] batchLoaders;
- private RawFragmentBatchProvider[] fragProviders;
- private FragmentContext context;
+ private final RawFragmentBatchProvider[] fragProviders;
+ private final FragmentContext context;
private BatchSchema schema;
private VectorContainer outgoingContainer;
private MergingReceiverGeneratorBase merger;
- private MergingReceiverPOP config;
+ private final MergingReceiverPOP config;
private boolean hasRun = false;
private boolean prevBatchWasFull = false;
private boolean hasMoreIncoming = true;
@@ -126,9 +124,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
}
- public MergingRecordBatch(FragmentContext context,
- MergingReceiverPOP config,
- RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
+ public MergingRecordBatch(final FragmentContext context,
+ final MergingReceiverPOP config,
+ final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
super(config, context, true, new OperatorContext(config, context, false));
//super(config, context);
this.fragProviders = fragProviders;
@@ -138,10 +136,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
this.config = config;
}
- private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{
+ private RawFragmentBatch getNext(final RawFragmentBatchProvider provider) throws IOException{
stats.startWait();
try {
- RawFragmentBatch b = provider.getNext();
+ final RawFragmentBatch b = provider.getNext();
if (b != null) {
stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
@@ -177,9 +175,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
schemaChanged = true; // first iteration is always a schema change
// set up each (non-empty) incoming record batch
- List<RawFragmentBatch> rawBatches = Lists.newArrayList();
+ final List<RawFragmentBatch> rawBatches = Lists.newArrayList();
int p = 0;
- for (RawFragmentBatchProvider provider : fragProviders) {
+ for (final RawFragmentBatchProvider provider : fragProviders) {
RawFragmentBatch rawBatch = null;
try {
// check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema
@@ -190,10 +188,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
rawBatch = getNext(provider);
}
p++;
- if (rawBatch == null && context.isCancelled()) {
+ if (rawBatch == null && !context.shouldContinue()) {
return IterOutcome.STOP;
}
- } catch (IOException e) {
+ } catch (final IOException e) {
context.fail(e);
return IterOutcome.STOP;
}
@@ -208,10 +206,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
;
}
- if (rawBatch == null && context.isCancelled()) {
+ if (rawBatch == null && !context.shouldContinue()) {
return IterOutcome.STOP;
}
- } catch (IOException e) {
+ } catch (final IOException e) {
context.fail(e);
return IterOutcome.STOP;
}
@@ -234,12 +232,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
int i = 0;
- for (RawFragmentBatch batch : incomingBatches) {
+ for (final RawFragmentBatch batch : incomingBatches) {
// initialize the incoming batchLoaders
- UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
+ final UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
try {
batchLoaders[i].load(rbd, batch.getBody());
- } catch(SchemaChangeException e) {
+ } catch(final SchemaChangeException e) {
logger.error("MergingReceiver failed to load record batch from remote host. {}", e);
context.fail(e);
return IterOutcome.STOP;
@@ -250,7 +248,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
// Canonicalize each incoming batch, so that vectors are alphabetically sorted based on SchemaPath.
- for (RecordBatchLoader loader : batchLoaders) {
+ for (final RecordBatchLoader loader : batchLoaders) {
loader.canonicalize();
}
@@ -262,15 +260,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
// create the outgoing schema and vector container, and allocate the initial batch
- SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
+ final SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
int vectorCount = 0;
- for (VectorWrapper<?> v : batchLoaders[0]) {
+ for (final VectorWrapper<?> v : batchLoaders[0]) {
// add field to the output schema
bldr.addField(v.getField());
// allocate a new value vector
- ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
+ final ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
++vectorCount;
}
allocateOutgoing();
@@ -286,7 +284,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// generate code for merge operations (copy and compare)
try {
merger = createMerger();
- } catch (SchemaChangeException e) {
+ } catch (final SchemaChangeException e) {
logger.error("Failed to generate code for MergingReceiver. {}", e);
context.fail(e);
return IterOutcome.STOP;
@@ -294,9 +292,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// allocate the priority queue with the generated comparator
this.pqueue = new PriorityQueue<Node>(fragProviders.length, new Comparator<Node>() {
- public int compare(Node node1, Node node2) {
- int leftIndex = (node1.batchId << 16) + node1.valueIndex;
- int rightIndex = (node2.batchId << 16) + node2.valueIndex;
+ public int compare(final Node node1, final Node node2) {
+ final int leftIndex = (node1.batchId << 16) + node1.valueIndex;
+ final int rightIndex = (node2.batchId << 16) + node2.valueIndex;
return merger.doEval(leftIndex, rightIndex);
}
});
@@ -305,14 +303,14 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
for (int b = 0; b < senderCount; ++b) {
while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) {
try {
- RawFragmentBatch batch = getNext(fragProviders[b]);
+ final RawFragmentBatch batch = getNext(fragProviders[b]);
incomingBatches[b] = batch;
if (batch != null) {
batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
} else {
batchLoaders[b].clear();
batchLoaders[b] = null;
- if (context.isCancelled()) {
+ if (!context.shouldContinue()) {
return IterOutcome.STOP;
}
}
@@ -332,7 +330,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
while (!pqueue.isEmpty()) {
// pop next value from pq and copy to outgoing batch
- Node node = pqueue.peek();
+ final Node node = pqueue.peek();
if (!copyRecordToOutgoingBatch(node)) {
logger.debug("Outgoing vectors space is full; breaking");
prevBatchWasFull = true;
@@ -355,10 +353,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
nextBatch = getNext(fragProviders[node.batchId]);
}
- if (nextBatch == null && context.isCancelled()) {
+ if (nextBatch == null && !context.shouldContinue()) {
return IterOutcome.STOP;
}
- } catch (IOException e) {
+ } catch (final IOException e) {
context.fail(e);
return IterOutcome.STOP;
}
@@ -369,7 +367,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// batch is empty
boolean allBatchesEmpty = true;
- for (RawFragmentBatch batch : incomingBatches) {
+ for (final RawFragmentBatch batch : incomingBatches) {
// see if all batches are empty so we can return OK_* or NONE
if (batch != null) {
allBatchesEmpty = false;
@@ -387,10 +385,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
continue;
}
- UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
+ final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
try {
batchLoaders[node.batchId].load(rbd, incomingBatches[node.batchId].getBody());
- } catch(SchemaChangeException ex) {
+ } catch(final SchemaChangeException ex) {
context.fail(ex);
return IterOutcome.STOP;
}
@@ -412,7 +410,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
// set the value counts in the outgoing vectors
- for (VectorWrapper vw : outgoingContainer) {
+ for (final VectorWrapper vw : outgoingContainer) {
vw.getValueVector().getMutator().setValueCount(outgoingPosition);
}
@@ -448,19 +446,19 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
state = BatchState.DONE;
return;
}
- RawFragmentBatch batch = getNext(fragProviders[i]);
+ final RawFragmentBatch batch = getNext(fragProviders[i]);
if (batch.getHeader().getDef().getFieldCount() == 0) {
i++;
continue;
}
tempBatchHolder[i] = batch;
- for (SerializedField field : batch.getHeader().getDef().getFieldList()) {
- ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field));
+ for (final SerializedField field : batch.getHeader().getDef().getFieldList()) {
+ final ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field));
v.allocateNew();
}
break;
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new DrillRuntimeException(e);
}
outgoingContainer = VectorContainer.canonicalize(outgoingContainer);
@@ -473,27 +471,28 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
@Override
- public void kill(boolean sendUpstream) {
+ public void kill(final boolean sendUpstream) {
if (sendUpstream) {
informSenders();
} else {
cleanup();
- for (RawFragmentBatchProvider provider : fragProviders) {
+ for (final RawFragmentBatchProvider provider : fragProviders) {
provider.kill(context);
}
}
}
private void informSenders() {
- FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+ logger.info("Informing senders of request to terminate sending.");
+ final FragmentHandle handlePrototype = FragmentHandle.newBuilder()
.setMajorFragmentId(config.getOppositeMajorFragmentId())
.setQueryId(context.getHandle().getQueryId())
.build();
- for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
- FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+ for (final MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
+ final FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
.setMinorFragmentId(providingEndpoint.getId())
.build();
- FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+ final FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
.setReceiver(context.getHandle())
.setSender(sender)
.build();
@@ -504,18 +503,18 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private class OutcomeListener implements RpcOutcomeListener<Ack> {
@Override
- public void failed(RpcException ex) {
+ public void failed(final RpcException ex) {
logger.warn("Failed to inform upstream that receiver is finished");
}
@Override
- public void success(Ack value, ByteBuf buffer) {
+ public void success(final Ack value, final ByteBuf buffer) {
// Do nothing
}
}
@Override
- protected void killIncoming(boolean sendUpstream) {
+ protected void killIncoming(final boolean sendUpstream) {
//No op
}
@@ -535,12 +534,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
@Override
- public TypedFieldId getValueVectorId(SchemaPath path) {
+ public TypedFieldId getValueVectorId(final SchemaPath path) {
return outgoingContainer.getValueVectorId(path);
}
@Override
- public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
return outgoingContainer.getValueAccessorById(clazz, ids);
}
@@ -549,10 +548,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
return WritableBatch.get(this);
}
- private boolean isSameSchemaAmongBatches(RecordBatchLoader[] batchLoaders) {
+ private boolean isSameSchemaAmongBatches(final RecordBatchLoader[] batchLoaders) {
Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not allowed!");
- BatchSchema schema = batchLoaders[0].getSchema();
+ final BatchSchema schema = batchLoaders[0].getSchema();
for (int i = 1; i < batchLoaders.length; i++) {
if (!schema.equals(batchLoaders[i].getSchema())) {
@@ -564,8 +563,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
private void allocateOutgoing() {
- for (VectorWrapper w : outgoingContainer) {
- ValueVector v = w.getValueVector();
+ for (final VectorWrapper w : outgoingContainer) {
+ final ValueVector v = w.getValueVector();
if (v instanceof FixedWidthVector) {
AllocationHelper.allocate(v, OUTGOING_BATCH_SIZE, 1);
} else {
@@ -587,12 +586,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private MergingReceiverGeneratorBase createMerger() throws SchemaChangeException {
try {
- CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
+ final CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ final ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
ExpandableHyperContainer batch = null;
boolean first = true;
- for (RecordBatchLoader loader : batchLoaders) {
+ for (final RecordBatchLoader loader : batchLoaders) {
if (first) {
batch = new ExpandableHyperContainer(loader);
first = false;
@@ -606,7 +605,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
g.setMappingSet(COPIER_MAPPING_SET);
CopyUtil.generateCopies(g, batch, true);
g.setMappingSet(MAIN_MAPPING);
- MergingReceiverGeneratorBase merger = context.getImplementationClass(cg);
+ final MergingReceiverGeneratorBase merger = context.getImplementationClass(cg);
merger.doSetup(context, batch, outgoingContainer);
return merger;
@@ -621,28 +620,28 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
public final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
- private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException {
+ private void generateComparisons(final ClassGenerator g, final VectorAccessible batch) throws SchemaChangeException {
g.setMappingSet(MAIN_MAPPING);
- for (Ordering od : popConfig.getOrderings()) {
+ for (final Ordering od : popConfig.getOrderings()) {
// first, we rewrite the evaluation stack for each side of the comparison.
- ErrorCollector collector = new ErrorCollectorImpl();
+ final ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
if (collector.hasErrors()) {
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
}
g.setMappingSet(LEFT_MAPPING);
- HoldingContainer left = g.addExpr(expr, false);
+ final HoldingContainer left = g.addExpr(expr, false);
g.setMappingSet(RIGHT_MAPPING);
- HoldingContainer right = g.addExpr(expr, false);
+ final HoldingContainer right = g.addExpr(expr, false);
g.setMappingSet(MAIN_MAPPING);
// next we wrap the two comparison sides and add the expression block for the comparison.
- LogicalExpression fh =
+ final LogicalExpression fh =
FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
context.getFunctionRegistry());
- HoldingContainer out = g.addExpr(fh, false);
- JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
+ final HoldingContainer out = g.addExpr(fh, false);
+ final JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
@@ -660,8 +659,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
*
* @param node Reference to the next record to copy from the incoming batches
*/
- private boolean copyRecordToOutgoingBatch(Node node) {
- int inIndex = (node.batchId << 16) + node.valueIndex;
+ private boolean copyRecordToOutgoingBatch(final Node node) {
+ final int inIndex = (node.batchId << 16) + node.valueIndex;
merger.doCopy(inIndex, outgoingPosition);
outgoingPosition++;
if (outgoingPosition == OUTGOING_BATCH_SIZE) {
@@ -677,7 +676,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
public class Node {
public int batchId; // incoming batch
public int valueIndex; // value within the batch
- Node(int batchId, int valueIndex) {
+ Node(final int batchId, final int valueIndex) {
this.batchId = batchId;
this.valueIndex = valueIndex;
}
@@ -687,7 +686,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
public void cleanup() {
outgoingContainer.clear();
if (batchLoaders != null) {
- for (RecordBatchLoader rbl : batchLoaders) {
+ for (final RecordBatchLoader rbl : batchLoaders) {
if (rbl != null) {
rbl.clear();
}
@@ -695,7 +694,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
oContext.close();
if (fragProviders != null) {
- for (RawFragmentBatchProvider f : fragProviders) {
+ for (final RawFragmentBatchProvider f : fragProviders) {
f.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index c50cb8a..c2d6166 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -20,10 +20,8 @@ package org.apache.drill.exec.physical.impl.producer;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
@@ -42,16 +40,16 @@ import org.apache.drill.exec.vector.ValueVector;
public class ProducerConsumerBatch extends AbstractRecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumerBatch.class);
- private RecordBatch incoming;
- private Thread producer = new Thread(new Producer(), Thread.currentThread().getName() + " - Producer Thread");
+ private final RecordBatch incoming;
+ private final Thread producer = new Thread(new Producer(), Thread.currentThread().getName() + " - Producer Thread");
private boolean running = false;
- private BlockingDeque<RecordBatchDataWrapper> queue;
+ private final BlockingDeque<RecordBatchDataWrapper> queue;
private int recordCount;
private BatchSchema schema;
private boolean stop = false;
private final CountDownLatch cleanUpLatch = new CountDownLatch(1); // used to wait producer to clean up
- protected ProducerConsumerBatch(ProducerConsumer popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+ protected ProducerConsumerBatch(final ProducerConsumer popConfig, final FragmentContext context, final RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
this.queue = new LinkedBlockingDeque<>(popConfig.getSize());
@@ -68,8 +66,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
stats.startWait();
wrapper = queue.take();
logger.debug("Got batch from queue");
- } catch (InterruptedException e) {
- if (!(context.isCancelled() || context.isFailed())) {
+ } catch (final InterruptedException e) {
+ if (!context.shouldContinue()) {
context.fail(e);
}
return IterOutcome.STOP;
@@ -84,30 +82,30 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
}
recordCount = wrapper.batch.getRecordCount();
- boolean newSchema = load(wrapper.batch);
+ final boolean newSchema = load(wrapper.batch);
return newSchema ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
}
- private boolean load(RecordBatchData batch) {
- VectorContainer newContainer = batch.getContainer();
+ private boolean load(final RecordBatchData batch) {
+ final VectorContainer newContainer = batch.getContainer();
if (schema != null && newContainer.getSchema().equals(schema)) {
container.zeroVectors();
- BatchSchema schema = container.getSchema();
+ final BatchSchema schema = container.getSchema();
for (int i = 0; i < container.getNumberOfColumns(); i++) {
- MaterializedField field = schema.getColumn(i);
- MajorType type = field.getType();
- ValueVector vOut = container.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
+ final MaterializedField field = schema.getColumn(i);
+ final MajorType type = field.getType();
+ final ValueVector vOut = container.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
container.getValueVectorId(field.getPath()).getFieldIds()).getValueVector();
- ValueVector vIn = newContainer.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
+ final ValueVector vIn = newContainer.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
newContainer.getValueVectorId(field.getPath()).getFieldIds()).getValueVector();
- TransferPair tp = vIn.makeTransferPair(vOut);
+ final TransferPair tp = vIn.makeTransferPair(vOut);
tp.transfer();
}
return false;
} else {
container.clear();
- for (VectorWrapper w : newContainer) {
+ for (final VectorWrapper w : newContainer) {
container.add(w.getValueVector());
}
container.buildSchema(SelectionVectorMode.NONE);
@@ -128,7 +126,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
}
outer:
while (true) {
- IterOutcome upstream = incoming.next();
+ final IterOutcome upstream = incoming.next();
switch (upstream) {
case NONE:
stop = true;
@@ -146,7 +144,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
throw new UnsupportedOperationException();
}
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.warn("Producer thread is interrupted.", e);
// TODO InterruptedException
} finally {
@@ -154,7 +152,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
try {
clearQueue();
queue.put(new RecordBatchDataWrapper(null, true, false));
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.error("Unable to enqueue the last batch indicator. Something is broken.", e);
// TODO InterruptedException
}
@@ -177,12 +175,12 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
}
@Override
- protected void killIncoming(boolean sendUpstream) {
+ protected void killIncoming(final boolean sendUpstream) {
stop = true;
producer.interrupt();
try {
producer.join();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for producer thread");
// TODO InterruptedException
}
@@ -193,7 +191,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
stop = true;
try {
cleanUpLatch.await();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e);
// TODO InterruptedException
} finally {
@@ -213,7 +211,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
boolean finished;
boolean failed;
- RecordBatchDataWrapper(RecordBatchData batch, boolean finished, boolean failed) {
+ RecordBatchDataWrapper(final RecordBatchData batch, final boolean finished, final boolean failed) {
this.batch = batch;
this.finished = finished;
this.failed = failed;