You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/01/26 12:44:30 UTC
[06/11] drill git commit: DRILL-5730: Mock testing improvements and
interface improvements
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index f4a9825..aa067cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -145,7 +145,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
try {
doWork();
} catch (DrillException e) {
- context.fail(e);
+ context.getExecutorState().fail(e);
cleanup();
return IterOutcome.STOP;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index c212593..9cde1a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -201,7 +201,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// where it would have been passed to context.fail()
// passing the exception directly to context.fail(e) will let the cleanup process continue instead of stopping
// right away, this will also make sure we collect any additional exception we may get while cleaning up
- context.fail(e);
+ context.getExecutorState().fail(e);
}
}
}
@@ -483,7 +483,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
mSorter.sort(this.container);
// sort may have prematurely exited due to should continue returning false.
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
return IterOutcome.STOP;
}
@@ -522,12 +522,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
} catch (SchemaChangeException ex) {
kill(false);
- context.fail(UserException.unsupportedError(ex)
+ context.getExecutorState().fail(UserException.unsupportedError(ex)
.message("Sort doesn't currently support sorts with changing schemas").build(logger));
return IterOutcome.STOP;
} catch(ClassTransformationException | IOException ex) {
kill(false);
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
} catch (UnsupportedOperationException e) {
throw new RuntimeException(e);
@@ -650,7 +650,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
try {
Thread.sleep(waitTime * 1000);
} catch(final InterruptedException e) {
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
throw e;
}
}
@@ -688,11 +688,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
private MSorter createNewMSorter() throws ClassTransformationException, IOException, SchemaChangeException {
- return createNewMSorter(this.context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+ return createNewMSorter(context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
}
- private MSorter createNewMSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
- throws ClassTransformationException, IOException, SchemaChangeException{
+ private MSorter createNewMSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet
+ rightMapping)
+ throws ClassTransformationException, IOException, SchemaChangeException {
CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getOptions());
ClassGenerator<MSorter> g = cg.getRoot();
g.setMappingSet(mainMapping);
@@ -735,7 +736,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
public SingleBatchSorter createNewSorter(FragmentContext context, VectorAccessible batch)
- throws ClassTransformationException, IOException, SchemaChangeException{
+ throws ClassTransformationException, IOException, SchemaChangeException {
CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(SingleBatchSorter.TEMPLATE_DEFINITION, context.getOptions());
cg.plainJavaCapable(true); // This class can generate plain-old Java.
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
index e579fc2..6a60196 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
@@ -22,19 +22,19 @@ import java.util.List;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.server.options.OptionManager;
import com.google.common.base.Preconditions;
+import org.apache.drill.exec.server.options.OptionManager;
public class ExternalSortBatchCreator implements BatchCreator<ExternalSort>{
@Override
- public AbstractRecordBatch<ExternalSort> getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children)
+ public AbstractRecordBatch<ExternalSort> getBatch(ExecutorFragmentContext context, ExternalSort config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 2f3d2f6..9b69170 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -130,7 +130,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
while (runStarts.size() > 1) {
// check if we're cancelled/failed frequently
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
return;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
index 733ea5e..4dbee3e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
@@ -19,12 +19,12 @@ package org.apache.drill.exec.physical.impl.xsort;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.selection.SelectionVector2;
public interface SingleBatchSorter {
- public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException;
+ public void setup(FragmentContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException;
public void sort(SelectionVector2 vector2) throws SchemaChangeException;
public static TemplateClassDefinition<SingleBatchSorter> TEMPLATE_DEFINITION =
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index 0f4680d..4a1af4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Named;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -38,7 +38,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
private SelectionVector2 vector2;
@Override
- public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{
+ public void setup(FragmentContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{
Preconditions.checkNotNull(vector2);
this.vector2 = vector2;
try {
@@ -76,7 +76,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
}
}
- public abstract void doSetup(@Named("context") FragmentContextInterface context,
+ public abstract void doSetup(@Named("context") FragmentContext context,
@Named("incoming") VectorAccessible incoming,
@Named("outgoing") RecordBatch outgoing)
throws SchemaChangeException;
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 9150fe3..23e66a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -135,8 +135,7 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
* into new batches of a size determined by that operator.</li>
* <li>A series of batches, without a selection vector, if the sort spills to
* disk. In this case, the downstream operator will still be a selection vector
- * remover, but there is nothing for that operator to remove. Each batch is
- * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li>
+ * remover, but there is nothing for that operator to remove.
* </ul>
* Note that, even in the in-memory sort case, this operator could do the copying
* to eliminate the extra selection vector remover. That is left as an exercise
@@ -375,7 +374,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// sort may have prematurely exited due to shouldContinue() returning false.
- if (! context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
sortState = SortState.DONE;
return IterOutcome.STOP;
}
@@ -440,8 +439,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
/**
* Handle a new schema from upstream. The ESB is quite limited in its ability
* to handle schema changes.
- *
- * @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA
*/
private void setupSchema() {
@@ -482,6 +479,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
* <p>
* Some Drill code ends up calling close() two or more times. The code
* here protects itself from these undesirable semantics.
+ * </p>
*/
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
index 698e32f..625d360 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
@@ -24,7 +24,7 @@ import javax.inject.Named;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -49,7 +49,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
*/
private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
- private FragmentContextInterface context;
+ private FragmentContext context;
/**
* Controls the maximum size of batches exposed to downstream
@@ -57,7 +57,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
private int desiredRecordBatchCount;
@Override
- public void setup(final FragmentContextInterface context, final BufferAllocator allocator, final SelectionVector4 vector4,
+ public void setup(final FragmentContext context, final BufferAllocator allocator, final SelectionVector4 vector4,
final VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException{
// we pass in the local hyperBatch since that is where we'll be reading data.
Preconditions.checkNotNull(vector4);
@@ -162,7 +162,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
final int totalCount = this.vector4.getTotalCount();
// check if we're cancelled/failed recently
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
return; }
int outIndex = 0;
@@ -233,7 +233,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
}
}
- public abstract void doSetup(@Named("context") FragmentContextInterface context,
+ public abstract void doSetup(@Named("context") FragmentContext context,
@Named("incoming") VectorContainer incoming,
@Named("outgoing") RecordBatch outgoing)
throws SchemaChangeException;
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
index 428f6f8..8eae8b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -30,7 +30,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
*/
public interface MSorter {
- public void setup(FragmentContextInterface context, BufferAllocator allocator, SelectionVector4 vector4,
+ public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4,
VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException;
public void sort();
public SelectionVector4 getSV4();
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index eb90614..015d078 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -103,7 +103,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
public final IterOutcome next(final RecordBatch b) {
- if(!context.shouldContinue()) {
+ if(!context.getExecutorState().shouldContinue()) {
return IterOutcome.STOP;
}
return next(0, b);
@@ -113,7 +113,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
IterOutcome next = null;
stats.stopProcessing();
try{
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
return IterOutcome.STOP;
}
next = b.next();
@@ -147,7 +147,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
return IterOutcome.NONE;
case OUT_OF_MEMORY:
// because we don't support schema changes, it is safe to fail the query right away
- context.fail(UserException.memoryError()
+ context.getExecutorState().fail(UserException.memoryError()
.build(logger));
// FALL-THROUGH
case STOP:
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 4a9828c..07312a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -85,7 +85,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
} finally {
stats.stopSetup();
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index e7c0ee5..381071f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -21,14 +21,10 @@ import io.netty.buffer.ByteBuf;
import java.util.concurrent.Semaphore;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitData.RpcType;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.record.FragmentWritableBatch;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.FutureBitCommand;
import org.apache.drill.exec.rpc.ListeningCommand;
-import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.testing.ControlsInjector;
@@ -96,23 +92,6 @@ public class DataTunnel {
}
}
- // TODO: This is not used anywhere. Can we remove this method and SendBatchAsyncFuture?
- public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
- SendBatchAsyncFuture b = new SendBatchAsyncFuture(batch, context);
- try{
- sendingSemaphore.acquire();
- manager.runCommand(b);
- }catch(final InterruptedException e){
- b.connectionFailed(FailureType.CONNECTION, new RpcException("Interrupted while trying to get sending semaphore.", e));
-
- // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
- // interruption and respond to it if it wants to.
- Thread.currentThread().interrupt();
- }
- return b.getFuture();
- }
-
-
private class ThrottlingOutcomeListener implements RpcOutcomeListener<Ack>{
RpcOutcomeListener<Ack> inner;
@@ -166,26 +145,4 @@ public class DataTunnel {
super.connectionFailed(type, t);
}
}
-
- private class SendBatchAsyncFuture extends FutureBitCommand<Ack, DataClientConnection> {
- final FragmentWritableBatch batch;
- final FragmentContext context;
-
- public SendBatchAsyncFuture(FragmentWritableBatch batch, FragmentContext context) {
- super();
- this.batch = batch;
- this.context = context;
- }
-
- @Override
- public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, DataClientConnection connection) {
- connection.send(new ThrottlingOutcomeListener(outcomeListener), RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
- }
-
- @Override
- public String toString() {
- return "SendBatch [batch.header=" + batch.getHeader() + "]";
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index 1f9c81c..5a0e14d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -252,6 +252,14 @@ public class BootStrapContext implements AutoCloseable {
}
}
+ if (scanExecutor != null) {
+ scanExecutor.shutdown();
+ }
+
+ if (scanDecodeExecutor != null) {
+ scanDecodeExecutor.shutdownNow();
+ }
+
try {
AutoCloseables.close(allocator, authProvider);
shutdown(loop);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
index 90f5623..f0b7243 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
@@ -19,11 +19,12 @@ package org.apache.drill.exec.server.options;
import com.google.common.collect.Maps;
import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ops.FragmentContextImpl;
import java.util.Map;
/**
- * {@link OptionManager} that holds options within {@link org.apache.drill.exec.ops.FragmentContext}.
+ * {@link OptionManager} that holds options within {@link FragmentContextImpl}.
*/
public class FragmentOptionManager extends InMemoryOptionManager {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionManager.class);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index f9d44cc..62d46d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang3.ArrayUtils;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContextInterface;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.store.dfs.easy.FileWork;
@@ -52,15 +51,6 @@ public class ColumnExplorer {
* between actual table columns, partition columns and implicit file columns.
* Also populates map with implicit columns names as keys and their values
*/
- public ColumnExplorer(FragmentContextInterface context, List<SchemaPath> columns) {
- this(context.getOptions(), columns);
- }
-
- /**
- * Helper class that encapsulates logic for sorting out columns
- * between actual table columns, partition columns and implicit file columns.
- * Also populates map with implicit columns names as keys and their values
- */
public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) {
this.partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
this.columns = columns;
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index f81f74e..d407ec3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -127,7 +127,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
@SuppressWarnings("resource")
CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
- final ColumnExplorer columnExplorer = new ColumnExplorer(context, scan.getColumns());
+ final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns());
if (!columnExplorer.isStarQuery()) {
scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
@@ -162,7 +162,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
}
- return new ScanBatch(scan, context, oContext, readers, implicitColumns);
+ return new ScanBatch(context, oContext, readers, implicitColumns);
}
public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
index f9dfd8b..4e71795 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
@@ -20,16 +20,14 @@ package org.apache.drill.exec.store.dfs.easy;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
-public class EasyReaderBatchCreator implements BatchCreator<EasySubScan>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyReaderBatchCreator.class);
-
+public class EasyReaderBatchCreator implements BatchCreator<EasySubScan> {
@Override
- public CloseableRecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children)
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, EasySubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
assert children == null || children.isEmpty();
return config.getFormatPlugin().getReaderBatch(context, config);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
index bfb4188..f3988b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
@@ -20,16 +20,14 @@ package org.apache.drill.exec.store.dfs.easy;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
public class EasyWriterBatchCreator implements BatchCreator<EasyWriter>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriterBatchCreator.class);
-
@Override
- public CloseableRecordBatch getBatch(FragmentContext context, EasyWriter config, List<RecordBatch> children)
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, EasyWriter config, List<RecordBatch> children)
throws ExecutionSetupException {
assert children != null && children.size() == 1;
return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
index 8442c32..c7b8cdd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
@@ -21,16 +21,14 @@ import java.util.Collections;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
public class DirectBatchCreator implements BatchCreator<DirectSubScan>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectBatchCreator.class);
-
@Override
- public ScanBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children)
+ public ScanBatch getBatch(ExecutorFragmentContext context, DirectSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
return new ScanBatch(config, context, Collections.singletonList(config.getReader()));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 4abf7a5..372a91d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -116,7 +116,7 @@ public class JSONRecordReader extends AbstractRecordReader {
this.enableAllTextMode = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
this.enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR);
this.readNumbersAsDouble = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR);
- this.unionEnabled = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+ this.unionEnabled = embeddedContent == null && fragmentContext.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
this.skipMalformedJSONRecords = fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR);
this.printSkippedMalformedJSONRecordLineNumber = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR);
setColumns(columns);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index a9a30e4..8209252 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -86,7 +86,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
if (context.getOptions().getOption(ExecConstants.ENABLE_NEW_TEXT_READER_KEY).bool_val == true) {
TextParsingSettings settings = new TextParsingSettings();
settings.set((TextFormatConfig)formatConfig);
- return new CompliantTextRecordReader(split, dfs, context, settings, columns);
+ return new CompliantTextRecordReader(split, dfs, settings, columns);
} else {
char delim = ((TextFormatConfig)formatConfig).getFieldDelimiter();
return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index 7009584..2eafdd5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -27,7 +27,6 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.AbstractRecordReader;
@@ -67,7 +66,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
// operator context for OutputMutator
private OperatorContext oContext;
- public CompliantTextRecordReader(FileSplit split, DrillFileSystem dfs, FragmentContext context, TextParsingSettings settings, List<SchemaPath> columns) {
+ public CompliantTextRecordReader(FileSplit split, DrillFileSystem dfs, TextParsingSettings settings, List<SchemaPath> columns) {
this.split = split;
this.settings = settings;
this.dfs = dfs;
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
index ce05543..76e3ae2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
@@ -21,17 +21,15 @@ import java.util.Collections;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.RecordReader;
-public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaBatchCreator.class);
-
+public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan> {
@Override
- public ScanBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children)
+ public ScanBatch getBatch(ExecutorFragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
RecordReader rr = config.getTable().getRecordReader(context.getFullRootSchema(), config.getFilter(), context.getOptions());
return new ScanBatch(config, context, Collections.singletonList(rr));
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java
index ac9cb6a..ed01376 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java
@@ -28,7 +28,6 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -58,7 +57,7 @@ public class ExtendedMockRecordReader extends AbstractRecordReader {
private final MockScanEntry config;
private final ColumnDef fields[];
- public ExtendedMockRecordReader(FragmentContext context, MockScanEntry config) {
+ public ExtendedMockRecordReader(MockScanEntry config) {
this.config = config;
fields = buildColumnDefs();
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 108a621..5c9a87a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -35,8 +35,6 @@ import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
public class MockRecordReader extends AbstractRecordReader {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
-
private final MockScanEntry config;
private final FragmentContext context;
private ValueVector[] valueVectors;
@@ -62,7 +60,7 @@ public class MockRecordReader extends AbstractRecordReader {
return x;
}
- private MaterializedField getVector(String name, MajorType type, int length) {
+ private MaterializedField getVector(String name, MajorType type) {
assert context != null : "Context shouldn't be null.";
final MaterializedField f = MaterializedField.create(name, type);
return f;
@@ -80,7 +78,7 @@ public class MockRecordReader extends AbstractRecordReader {
for (int i = 0; i < config.getTypes().length; i++) {
final MajorType type = config.getTypes()[i].getMajorType();
- final MaterializedField field = getVector(config.getTypes()[i].getName(), type, batchRecordCount);
+ final MaterializedField field = getVector(config.getTypes()[i].getName(), type);
final Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
valueVectors[i] = output.addField(field, vvClass);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
index 8f89eff..01a102d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.mock;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
@@ -32,17 +32,15 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> {
- //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
-
@Override
- public ScanBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children)
+ public ScanBatch getBatch(ExecutorFragmentContext context, MockSubScanPOP config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
final List<MockScanEntry> entries = config.getReadEntries();
final List<RecordReader> readers = Lists.newArrayList();
for(final MockTableDef.MockScanEntry e : entries) {
if ( e.isExtended( ) ) {
- readers.add(new ExtendedMockRecordReader(context, e));
+ readers.add(new ExtendedMockRecordReader(e));
} else {
readers.add(new MockRecordReader(context, e));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
index bc4be13..337f220 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
@@ -25,7 +25,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.fn.FunctionLookupContext;
import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
import org.apache.drill.exec.expr.stat.RangeExprEvaluator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -74,7 +74,7 @@ public class ParquetRGFilterEvaluator {
public static boolean canDrop(LogicalExpression expr, Map<SchemaPath, ColumnStatistics> columnStatisticsMap,
- long rowCount, UdfUtilities udfUtilities, FunctionImplementationRegistry functionImplementationRegistry) {
+ long rowCount, UdfUtilities udfUtilities, FunctionLookupContext functionImplementationRegistry) {
ErrorCollector errorCollector = new ErrorCollectorImpl();
LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
expr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index bb0b65f..62948b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -27,7 +27,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -49,7 +49,6 @@ import org.apache.parquet.schema.Type;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-
public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
@@ -59,12 +58,12 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
@SuppressWarnings("resource")
@Override
- public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children)
+ public ScanBatch getBatch(ExecutorFragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
OperatorContext oContext = context.newOperatorContext(rowGroupScan);
- final ColumnExplorer columnExplorer = new ColumnExplorer(context, rowGroupScan.getColumns());
+ final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
if (!columnExplorer.isStarQuery()) {
rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getUserName(), rowGroupScan.getStorageEngine(),
@@ -154,7 +153,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
}
- return new ScanBatch(rowGroupScan, context, oContext, readers, implicitColumns);
+ return new ScanBatch(context, oContext, readers, implicitColumns);
}
private static boolean isComplex(ParquetMetadata footer) {
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
index 79c5709..cfb65d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.parquet;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.WriterRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
@@ -29,7 +29,7 @@ public class ParquetWriterBatchCreator implements BatchCreator<ParquetWriter>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetWriterBatchCreator.class);
@Override
- public WriterRecordBatch getBatch(FragmentContext context, ParquetWriter config, List<RecordBatch> children)
+ public WriterRecordBatch getBatch(ExecutorFragmentContext context, ParquetWriter config, List<RecordBatch> children)
throws ExecutionSetupException {
assert children != null && children.size() == 1;
return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index ad1c4bf..04e76b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -38,8 +38,7 @@ public class ParquetToDrillTypeConverter {
}
private static TypeProtos.MinorType getMinorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
- ConvertedType convertedType, int precision, int scale,
- OptionManager options) {
+ ConvertedType convertedType, int precision, OptionManager options) {
switch (primitiveTypeName) {
@@ -139,7 +138,7 @@ public class ParquetToDrillTypeConverter {
public static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
TypeProtos.DataMode mode, ConvertedType convertedType, int precision, int scale,
OptionManager options) {
- MinorType minorType = getMinorType(primitiveTypeName, length, convertedType, precision, scale, options);
+ MinorType minorType = getMinorType(primitiveTypeName, length, convertedType, precision, options);
TypeProtos.MajorType.Builder typeBuilder = TypeProtos.MajorType.newBuilder().setMinorType(minorType).setMode(mode);
if (CoreDecimalUtility.isDecimalType(minorType)) {
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java
index 6294655..e302e7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java
@@ -22,8 +22,6 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.ParquetOutputRecordWriter;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetToDrillTypeConverter;
import org.apache.parquet.column.ColumnDescriptor;
@@ -37,7 +35,6 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.joda.time.DateTimeConstants;
-import org.joda.time.DateTimeUtils;
import java.util.ArrayList;
import java.util.HashMap;
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java
index 15a56a2..554db10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java
@@ -26,7 +26,7 @@ import java.util.Map.Entry;
import java.util.TimeZone;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnection;
import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnectionConfig;
import org.apache.drill.exec.rpc.user.UserSession;
@@ -45,13 +45,13 @@ public class BitToUserConnectionIterator implements Iterator<Object> {
private String queryingUsername;
private boolean isAdmin;
- public BitToUserConnectionIterator(FragmentContext context) {
+ public BitToUserConnectionIterator(ExecutorFragmentContext context) {
queryingUsername = context.getQueryUserName();
isAdmin = hasAdminPrivileges(context);
itr = iterateConnectionInfo(context);
}
- private boolean hasAdminPrivileges(FragmentContext context) {
+ private boolean hasAdminPrivileges(ExecutorFragmentContext context) {
OptionManager options = context.getOptions();
if (context.isUserAuthenticationEnabled() &&
!ImpersonationUtil.hasAdminPrivileges(
@@ -65,11 +65,10 @@ public class BitToUserConnectionIterator implements Iterator<Object> {
return true;
}
- private Iterator<ConnectionInfo> iterateConnectionInfo(FragmentContext context) {
- Set<Entry<BitToUserConnection, BitToUserConnectionConfig>> activeConnections =
- context.getDrillbitContext().getUserConnections();
+ private Iterator<ConnectionInfo> iterateConnectionInfo(ExecutorFragmentContext context) {
+ Set<Entry<BitToUserConnection, BitToUserConnectionConfig>> activeConnections = context.getUserConnections();
- String hostname = context.getIdentity().getAddress();
+ String hostname = context.getEndpoint().getAddress();
List<ConnectionInfo> connectionInfos = new LinkedList<ConnectionInfo>();
for (Entry<BitToUserConnection, BitToUserConnectionConfig> connection : activeConnections) {
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
index dc4e7c7..08d3706 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.store.sys;
import java.util.Iterator;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
public class DrillbitIterator implements Iterator<Object> {
@@ -28,9 +28,9 @@ public class DrillbitIterator implements Iterator<Object> {
private Iterator<DrillbitEndpoint> endpoints;
private DrillbitEndpoint current;
- public DrillbitIterator(FragmentContext c) {
- this.endpoints = c.getDrillbitContext().getAvailableBits().iterator();
- this.current = c.getIdentity();
+ public DrillbitIterator(ExecutorFragmentContext c) {
+ this.endpoints = c.getBits().iterator();
+ this.current = c.getEndpoint();
}
public static class DrillbitInstance {
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
index e624ada..8d437a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
@@ -25,15 +25,15 @@ import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
public class MemoryIterator implements Iterator<Object> {
private boolean beforeFirst = true;
- private final FragmentContext context;
+ private final ExecutorFragmentContext context;
- public MemoryIterator(final FragmentContext context) {
+ public MemoryIterator(final ExecutorFragmentContext context) {
this.context = context;
}
@@ -50,7 +50,7 @@ public class MemoryIterator implements Iterator<Object> {
beforeFirst = false;
final MemoryInfo memoryInfo = new MemoryInfo();
- final DrillbitEndpoint endpoint = context.getIdentity();
+ final DrillbitEndpoint endpoint = context.getEndpoint();
memoryInfo.hostname = endpoint.getAddress();
memoryInfo.user_port = endpoint.getUserPort();
@@ -62,7 +62,7 @@ public class MemoryIterator implements Iterator<Object> {
memoryInfo.jvm_direct_current = directBean.getMemoryUsed();
- memoryInfo.direct_current = context.getDrillbitContext().getAllocator().getAllocatedMemory();
+ memoryInfo.direct_current = context.getAllocator().getAllocatedMemory();
memoryInfo.direct_max = DrillConfig.getMaxDirectMemory();
return memoryInfo;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java
index e9282c3..eef6604 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.sys;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
@@ -37,7 +37,7 @@ public class ProfileInfoIterator extends ProfileIterator {
private final Iterator<ProfileInfo> itr;
- public ProfileInfoIterator(FragmentContext context) {
+ public ProfileInfoIterator(ExecutorFragmentContext context) {
super(context);
itr = iterateProfileInfo();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
index f1d2075..6112eb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
@@ -23,7 +23,7 @@ import java.util.List;
import java.util.Map.Entry;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.server.QueryProfileStoreContext;
import org.apache.drill.exec.server.options.OptionManager;
@@ -37,17 +37,15 @@ public abstract class ProfileIterator implements Iterator<Object> {
protected final String queryingUsername;
protected final boolean isAdmin;
- public ProfileIterator(FragmentContext context) {
+ public ProfileIterator(ExecutorFragmentContext context) {
//Grab profile Store Context
- profileStoreContext = context
- .getDrillbitContext()
- .getProfileStoreContext();
+ profileStoreContext = context.getProfileStoreContext();
queryingUsername = context.getQueryUserName();
isAdmin = hasAdminPrivileges(context);
}
- protected boolean hasAdminPrivileges(FragmentContext context) {
+ protected boolean hasAdminPrivileges(ExecutorFragmentContext context) {
OptionManager options = context.getOptions();
if (context.isUserAuthenticationEnabled() &&
!ImpersonationUtil.hasAdminPrivileges(
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java
index fcc2921..67f1165 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java
@@ -23,7 +23,7 @@ import java.util.Map.Entry;
import javax.annotation.Nullable;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.serialization.InstanceSerializer;
@@ -40,7 +40,7 @@ public class ProfileJsonIterator extends ProfileIterator {
private final InstanceSerializer<QueryProfile> profileSerializer;
private final Iterator<ProfileJson> itr;
- public ProfileJsonIterator(FragmentContext context) {
+ public ProfileJsonIterator(ExecutorFragmentContext context) {
super(context);
//Holding a serializer (for JSON extract)
profileSerializer = profileStoreContext.
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
index 034f70c..a49ff9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.store.sys;
import java.util.Iterator;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.store.sys.OptionIterator.OptionValueWrapper;
/**
@@ -33,90 +33,88 @@ import org.apache.drill.exec.store.sys.OptionIterator.OptionValueWrapper;
public enum SystemTable {
OPTION("options", false, OptionValueWrapper.class) {
@Override
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
return new OptionIterator(context, OptionIterator.Mode.SYS_SESS_PUBLIC);
}
},
OPTION_VAL("options_val", false, ExtendedOptionIterator.ExtendedOptionValueWrapper.class) {
@Override
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
return new ExtendedOptionIterator(context, false);
}
},
INTERNAL_OPTIONS("internal_options", false, OptionValueWrapper.class) {
@Override
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
return new OptionIterator(context, OptionIterator.Mode.SYS_SESS_INTERNAL);
}
},
INTERNAL_OPTIONS_VAL("internal_options_val", false, ExtendedOptionIterator.ExtendedOptionValueWrapper.class) {
@Override
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
return new ExtendedOptionIterator(context, true);
}
},
BOOT("boot", false, OptionValueWrapper.class) {
@Override
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
return new OptionIterator(context, OptionIterator.Mode.BOOT);
}
},
DRILLBITS("drillbits", false,DrillbitIterator.DrillbitInstance.class) {
@Override
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
return new DrillbitIterator(context);
}
},
VERSION("version", false, VersionIterator.VersionInfo.class) {
@Override
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
return new VersionIterator();
}
},
MEMORY("memory", true, MemoryIterator.MemoryInfo.class) {
@Override
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
return new MemoryIterator(context);
}
},
CONNECTIONS("connections", true, BitToUserConnectionIterator.ConnectionInfo.class) {
@Override
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
return new BitToUserConnectionIterator(context);
}
},
PROFILES("profiles", false, ProfileInfoIterator.ProfileInfo.class) {
@Override
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
return new ProfileInfoIterator(context);
}
},
PROFILES_JSON("profiles_json", false, ProfileJsonIterator.ProfileJson.class) {
@Override
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
return new ProfileJsonIterator(context);
}
},
THREADS("threads", true, ThreadsIterator.ThreadsInfo.class) {
@Override
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
return new ThreadsIterator(context);
}
};
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTable.class);
-
private final String tableName;
private final boolean distributed;
private final Class<?> pojoClass;
@@ -127,7 +125,7 @@ public enum SystemTable {
this.pojoClass = pojoClass;
}
- public Iterator<Object> getIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
throw new UnsupportedOperationException(tableName + " must override this method.");
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index ab87a4a..c0ef0d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -23,7 +23,7 @@ import java.util.List;
import com.google.common.collect.ImmutableList;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
@@ -39,8 +39,8 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
- public ScanBatch getBatch(final FragmentContext context, final SystemTableScan scan,
- final List<RecordBatch> children)
+ public ScanBatch getBatch(final ExecutorFragmentContext context, final SystemTableScan scan,
+ final List<RecordBatch> children)
throws ExecutionSetupException {
final SystemTable table = scan.getTable();
final Iterator<Object> iterator = table.getIterator(context);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
index 681119d..567d299 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
@@ -21,15 +21,15 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.Iterator;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
public class ThreadsIterator implements Iterator<Object> {
private boolean beforeFirst = true;
- private final FragmentContext context;
+ private final ExecutorFragmentContext context;
- public ThreadsIterator(final FragmentContext context) {
+ public ThreadsIterator(final ExecutorFragmentContext context) {
this.context = context;
}
@@ -46,7 +46,7 @@ public class ThreadsIterator implements Iterator<Object> {
beforeFirst = false;
final ThreadsInfo threadsInfo = new ThreadsInfo();
- final DrillbitEndpoint endpoint = context.getIdentity();
+ final DrillbitEndpoint endpoint = context.getEndpoint();
threadsInfo.hostname = endpoint.getAddress();
threadsInfo.user_port = endpoint.getUserPort();
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java
index c7bdbad..cfcb9e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.testing;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContextImpl;
import org.slf4j.Logger;
public interface ControlsInjector {
@@ -43,19 +43,6 @@ public interface ControlsInjector {
void injectUnchecked(ExecutionControls executionControls, String desc);
/**
- * Inject (throw) an unchecked exception at this point, if the fragmentContext is not null,
- * an injection is specified, and it is time for it to be thrown.
- * <p/>
- * <p>Implementors use this in their code at a site where they want to simulate an exception
- * during testing.
- *
- * @param fragmentContext fragmentContext used to retrieve the controls, can be null
- * @param desc the site description
- * throws the exception specified by the injection, if it is time
- */
- void injectUnchecked(FragmentContext fragmentContext, String desc);
-
- /**
* Inject (throw) a checked exception at this point, if an injection is specified, and it is time
* for it to be thrown.
* <p/>
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
index 82646ea..78c510a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
@@ -131,6 +132,11 @@ public final class ExecutionControls {
private final DrillbitEndpoint endpoint; // the current endpoint
+ @VisibleForTesting
+ public ExecutionControls(final OptionManager options) {
+ this(options, null);
+ }
+
public ExecutionControls(final OptionManager options, final DrillbitEndpoint endpoint) {
this.endpoint = endpoint;
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
index 3a74fee..e5234f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -19,16 +19,15 @@ package org.apache.drill.exec.testing;
import java.util.concurrent.TimeUnit;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContextImpl;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
/**
* Injects exceptions and pauses at execution time for testing. Any class that wants to simulate exceptions
* or inject pauses for testing should have it's own private static instance of an injector (similar to the use
- * of loggers). Injection site either use {@link org.apache.drill.exec.ops.FragmentContext} or
- * {@link org.apache.drill.exec.ops.QueryContext}. See {@link org.apache.drill.exec.testing.TestExceptionInjection} and
- * {@link org.apache.drill.exec.testing.TestPauseInjection} for examples of use.
+ * of loggers). Injection site either use {@link FragmentContextImpl} or
+ * {@link org.apache.drill.exec.ops.QueryContext}.
* See {@link ControlsInjector} for documentation.
*/
public class ExecutionControlsInjector implements ControlsInjector {
@@ -60,13 +59,6 @@ public class ExecutionControlsInjector implements ControlsInjector {
}
@Override
- public void injectUnchecked(final FragmentContext fragmentContext, final String desc) {
- if (fragmentContext != null) {
- injectUnchecked(fragmentContext.getExecutionControls(), desc);
- }
- }
-
- @Override
public <T extends Throwable> void injectChecked(final ExecutionControls executionControls, final String desc,
final Class<T> exceptionClass) throws T {
Preconditions.checkNotNull(executionControls);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index b8ec44d..ae853d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.testing;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContextImpl;
import org.slf4j.Logger;
/**
@@ -48,10 +48,6 @@ public final class NoOpControlsInjector implements ControlsInjector {
}
@Override
- public void injectUnchecked(final FragmentContext fragmentContext, final String desc) {
- }
-
- @Override
public <T extends Throwable> void injectChecked(
final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index 8a3e5b6..793cc01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -29,9 +29,7 @@ import org.apache.drill.exec.util.ArrayWrappedIntIntMap;
import com.google.common.base.Preconditions;
-public abstract class AbstractDataCollector implements DataCollector{
-
- // private final List<MinorFragmentEndpoint> incoming;
+public abstract class AbstractDataCollector implements DataCollector {
private final int oppositeMajorFragmentId;
private final AtomicIntegerArray remainders;
private final AtomicInteger remainingRequired;
@@ -42,7 +40,6 @@ public abstract class AbstractDataCollector implements DataCollector{
/**
* @param parentAccounter
- * @param receiver
* @param numBuffers Number of RawBatchBuffer inputs required to store the incoming data
* @param bufferCapacity Capacity of each RawBatchBuffer.
* @param context
@@ -74,7 +71,7 @@ public abstract class AbstractDataCollector implements DataCollector{
if (spooling) {
buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId(), i);
} else {
- buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId());
+ buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity);
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index f15a3e6..382ab1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -35,13 +35,13 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
}
protected interface BufferQueue<T> {
- public void addOomBatch(RawFragmentBatch batch);
- public RawFragmentBatch poll() throws IOException;
- public RawFragmentBatch take() throws IOException, InterruptedException;
- public boolean checkForOutOfMemory();
- public int size();
- public boolean isEmpty();
- public void add(T obj);
+ void addOomBatch(RawFragmentBatch batch);
+ RawFragmentBatch poll() throws IOException;
+ RawFragmentBatch take() throws IOException, InterruptedException;
+ boolean checkForOutOfMemory();
+ int size();
+ boolean isEmpty();
+ void add(T obj);
}
protected BufferQueue<T> bufferQueue;
@@ -74,7 +74,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
// if this fragment is already canceled or failed, we shouldn't need any or more stuff. We do the null check to
// ensure that tests run.
- if (context != null && !context.shouldContinue()) {
+ if (context != null && !context.getExecutorState().shouldContinue()) {
this.kill(context);
}
@@ -102,14 +102,14 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
// ## Add assertion that all acks have been sent. TODO
@Override
public void close() {
- if (!isTerminated() && context.shouldContinue()) {
+ if (!isTerminated() && context.getExecutorState().shouldContinue()) {
final String msg = String.format("Cleanup before finished. %d out of %d strams have finished", completedStreams(), fragmentCount);
throw new IllegalStateException(msg);
}
if (!bufferQueue.isEmpty()) {
- if (context.shouldContinue()) {
- context.fail(new IllegalStateException("Batches still in queue during cleanup"));
+ if (context.getExecutorState().shouldContinue()) {
+ context.getExecutorState().fail(new IllegalStateException("Batches still in queue during cleanup"));
logger.error("{} Batches in queue.", bufferQueue.size());
}
clearBufferWithBody();
@@ -133,7 +133,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
batch = bufferQueue.poll();
assertAckSent(batch);
} catch (IOException e) {
- context.fail(e);
+ context.getExecutorState().fail(e);
continue;
}
if (batch.getBody() != null) {
@@ -172,7 +172,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
} catch (final InterruptedException e) {
// We expect that the interrupt means the fragment is canceled or failed, so we should kill this buffer
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
kill(context);
} else {
throw new DrillRuntimeException("Interrupted but context.shouldContinue() is true", e);
@@ -184,7 +184,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
return null;
}
- if (context.isOverMemoryLimit()) {
+ if (context.getAllocator().isOverLimit()) {
outOfMemory.set(true);
}