You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/12/02 05:23:53 UTC
[06/10] git commit: DRILL-274: Spooling batch buffer
DRILL-274: Spooling batch buffer
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2c811a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2c811a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2c811a83
Branch: refs/heads/master
Commit: 2c811a83b30295fb39e1540ce76fbf54768ed50c
Parents: 6c0389f
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sun Dec 1 20:04:44 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:04:44 2013 -0800
----------------------------------------------------------------------
distribution/src/resources/drill-override.conf | 35 ++-
.../org/apache/drill/exec/ExecConstants.java | 5 +
.../apache/drill/exec/ops/FragmentContext.java | 10 +-
.../exec/physical/impl/WireRecordBatch.java | 15 +-
.../impl/mergereceiver/MergingRecordBatch.java | 15 +-
.../exec/record/RawFragmentBatchProvider.java | 4 +-
.../exec/store/LocalSyncableFileSystem.java | 183 ++++++++++++++
.../work/batch/AbstractFragmentCollector.java | 22 +-
.../drill/exec/work/batch/BatchCollector.java | 4 +-
.../exec/work/batch/BitComHandlerImpl.java | 2 +-
.../drill/exec/work/batch/IncomingBuffers.java | 24 +-
.../drill/exec/work/batch/MergingCollector.java | 5 +-
.../exec/work/batch/PartitionedCollector.java | 5 +-
.../drill/exec/work/batch/RawBatchBuffer.java | 4 +-
.../exec/work/batch/SpoolingRawBatchBuffer.java | 250 +++++++++++++++++++
.../work/batch/UnlimitedRawBatchBuffer.java | 4 +
.../work/foreman/RunningFragmentManager.java | 6 +-
.../work/fragment/RemoteFragmentHandler.java | 5 +-
.../src/main/resources/drill-module.conf | 11 +-
.../apache/drill/exec/client/DumpCatTest.java | 2 +-
.../exec/fn/impl/TestRepeatedFunction.java | 2 +-
.../physical/impl/TestComparisonFunctions.java | 2 +-
.../exec/physical/impl/TestOptiqPlans.java | 4 +-
.../exec/physical/impl/TestSimpleFunctions.java | 10 +-
.../drill/exec/physical/impl/agg/TestAgg.java | 2 +-
.../physical/impl/filter/TestSimpleFilter.java | 4 +-
.../exec/physical/impl/join/TestMergeJoin.java | 8 +-
.../physical/impl/limit/TestSimpleLimit.java | 4 +-
.../impl/project/TestSimpleProjection.java | 2 +-
.../exec/physical/impl/sort/TestSimpleSort.java | 4 +-
.../physical/impl/svremover/TestSVRemover.java | 2 +-
.../impl/trace/TestTraceMultiRecordBatch.java | 2 +-
.../impl/trace/TestTraceOutputDump.java | 2 +-
.../physical/impl/union/TestSimpleUnion.java | 2 +-
.../apache/drill/exec/work/batch/FileTest.java | 66 +++++
.../exec/work/batch/TestSpoolingBuffer.java | 62 +++++
.../src/test/resources/drill-module.conf | 4 +-
.../test/resources/drill-spool-test-module.conf | 83 ++++++
.../resources/work/batch/multiple_exchange.json | 48 ++++
39 files changed, 851 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf
index 18a2a6a..5fe2362 100644
--- a/distribution/src/resources/drill-override.conf
+++ b/distribution/src/resources/drill-override.conf
@@ -16,6 +16,9 @@
// This file tells Drill to consider this module when class path scanning.
// This file can also include any supplementary configuration information.
// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl"
+
drill.exec: {
cluster-id: "drillbits1"
rpc: {
@@ -46,11 +49,19 @@ drill.exec: {
optimizer: {
implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
},
+ functions: ["org.apache.drill.expr.fn.impl"],
storage: {
- packages += "org.apache.drill.exec.store"
- }
- metrics : {
- context: "drillbit"
+ packages += "org.apache.drill.exec.store"
+ },
+ metrics : {
+ context: "drillbit",
+ jmx: {
+ enabled : true
+ },
+ log: {
+ enabled : false,
+ interval : 60
+ }
},
zk: {
connect: "localhost:2181",
@@ -60,7 +71,7 @@ drill.exec: {
retry: {
count: 7200,
delay: 500
- }
+ }
},
functions: ["org.apache.drill.expr.fn.impl"],
network: {
@@ -70,8 +81,18 @@ drill.exec: {
max.width.per.endpoint: 5,
global.max.width: 100,
executor.threads: 4
- }
+ },
trace: {
- directory: "/tmp"
+ directory: "/var/log/drill",
+ filesystem: "file:///"
+ },
+ tmp: {
+ directories: ["/tmp/drill"],
+ filesystem: "drill-local:///"
+ },
+ spooling: {
+ impl: "org.apache.drill.exec.work.batch.SpoolingRawBatchBuffer",
+ delete: false,
+ size: 100000000
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 36504f6..5336c0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -45,4 +45,9 @@ public interface ExecConstants {
public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads";
public static final String TRACE_DUMP_DIRECTORY = "drill.exec.trace.directory";
public static final String TRACE_DUMP_FILESYSTEM = "drill.exec.trace.filesystem";
+ public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories";
+ public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem";
+ public static final String SPOOLING_BUFFER_IMPL = "drill.exec.spooling.impl";
+ public static final String SPOOLING_BUFFER_DELETE = "drill.exec.spooling.delete";
+ public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.spooling.size";
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 674dafc..f75cf5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.metrics.SingleThreadNestedCounter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
@@ -57,14 +58,14 @@ public class FragmentContext {
public final Timer fragmentTime;
private final FragmentHandle handle;
private final UserClientConnection connection;
- private final IncomingBuffers buffers;
+ private IncomingBuffers buffers;
private volatile Throwable failureCause;
private volatile boolean failed = false;
private final FunctionImplementationRegistry funcRegistry;
private final QueryClassLoader loader;
private final ClassTransformer transformer;
- public FragmentContext(DrillbitContext dbContext, FragmentHandle handle, UserClientConnection connection, IncomingBuffers buffers, FunctionImplementationRegistry funcRegistry) {
+ public FragmentContext(DrillbitContext dbContext, FragmentHandle handle, UserClientConnection connection, FunctionImplementationRegistry funcRegistry) {
this.loader = new QueryClassLoader(true);
this.transformer = new ClassTransformer();
this.fragmentTime = dbContext.getMetrics().timer(METRIC_TIMER_FRAGMENT_TIME);
@@ -74,10 +75,13 @@ public class FragmentContext {
this.context = dbContext;
this.connection = connection;
this.handle = handle;
- this.buffers = buffers;
this.funcRegistry = funcRegistry;
}
+ public void setBuffers(IncomingBuffers buffers) {
+ this.buffers = buffers;
+ }
+
public void fail(Throwable cause) {
logger.debug("Fragment Context received failure. {}", cause);
failed = true;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index 0b0214a..6a16367 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl;
+import java.io.IOException;
import java.util.Iterator;
import org.apache.drill.common.expression.SchemaPath;
@@ -96,14 +97,14 @@ public class WireRecordBatch implements RecordBatch{
@Override
public IterOutcome next() {
- RawFragmentBatch batch = fragProvider.getNext();
+ try{
+ RawFragmentBatch batch = fragProvider.getNext();
- // skip over empty batches. we do this since these are basically control messages.
- while(batch != null && batch.getHeader().getDef().getRecordCount() == 0){
- batch = fragProvider.getNext();
- }
+ // skip over empty batches. we do this since these are basically control messages.
+ while(batch != null && batch.getHeader().getDef().getRecordCount() == 0){
+ batch = fragProvider.getNext();
+ }
- try{
if (batch == null) return IterOutcome.NONE;
@@ -119,7 +120,7 @@ public class WireRecordBatch implements RecordBatch{
}else{
return IterOutcome.OK;
}
- }catch(SchemaChangeException ex){
+ }catch(SchemaChangeException | IOException ex){
context.fail(ex);
return IterOutcome.STOP;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 6d8a284..fd392a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -132,7 +132,13 @@ public class MergingRecordBatch implements RecordBatch {
// set up each (non-empty) incoming record batch
List<RawFragmentBatch> rawBatches = Lists.newArrayList();
for (RawFragmentBatchProvider provider : fragProviders) {
- RawFragmentBatch rawBatch = provider.getNext();
+ RawFragmentBatch rawBatch = null;
+ try {
+ rawBatch = provider.getNext();
+ } catch (IOException e) {
+ context.fail(e);
+ return IterOutcome.STOP;
+ }
if (rawBatch.getHeader().getDef().getRecordCount() != 0)
rawBatches.add(rawBatch);
}
@@ -226,7 +232,12 @@ public class MergingRecordBatch implements RecordBatch {
if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
// reached the end of an incoming record batch
- incomingBatches[node.batchId] = fragProviders[node.batchId].getNext();
+ try {
+ incomingBatches[node.batchId] = fragProviders[node.batchId].getNext();
+ } catch (IOException e) {
+ context.fail(e);
+ return IterOutcome.STOP;
+ }
if (incomingBatches[node.batchId].getHeader().getIsLastBatch() ||
incomingBatches[node.batchId].getHeader().getDef().getRecordCount() == 0) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
index 3390af9..6f5f7a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
@@ -19,9 +19,11 @@ package org.apache.drill.exec.record;
import org.apache.drill.exec.ops.FragmentContext;
+import java.io.IOException;
+
public interface RawFragmentBatchProvider {
- public RawFragmentBatch getNext();
+ public RawFragmentBatch getNext() throws IOException;
public void kill(FragmentContext context);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
new file mode 100644
index 0000000..10a4dc5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * This class provides a Syncable local extension of the hadoop FileSystem
+ */
+public class LocalSyncableFileSystem extends FileSystem {
+
+ @Override
+ public URI getUri() {
+ try {
+ return new URI("drill-local:///");
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int i) throws IOException {
+ return new FSDataInputStream(new LocalInputStream(path));
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i2, long l, Progressable progressable) throws IOException {
+ return new FSDataOutputStream(new LocalSyncableOutputStream(path));
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
+ throw new IOException("Append is not supported in LocalSyncableFilesystem");
+ }
+
+ @Override
+ public boolean rename(Path path, Path path2) throws IOException {
+ throw new IOException("Rename not supported");
+ }
+
+ @Override
+ public boolean delete(Path path) throws IOException {
+ File file = new File(path.toString());
+ return file.delete();
+ }
+
+ @Override
+ public boolean delete(Path path, boolean b) throws IOException {
+ File file = new File(path.toString());
+ if (b) {
+ if (file.isDirectory()) {
+ FileUtils.deleteDirectory(file);
+ } else {
+ file.delete();
+ }
+ } else if (file.isDirectory()) {
+ throw new IOException("Cannot delete directory");
+ }
+ file.delete();
+ return true;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ throw new IOException("listStatus not supported");
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return null;
+ }
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
+ return new File(path.toString()).mkdirs();
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ return null;
+ }
+
+ public class LocalSyncableOutputStream extends OutputStream implements Syncable {
+ private FileOutputStream fos;
+ private BufferedOutputStream output;
+
+ public LocalSyncableOutputStream(Path path) throws FileNotFoundException {
+ File dir = new File(path.getParent().toString());
+ if (!dir.exists()) {
+ boolean success = dir.mkdirs();
+ if (!success) {
+ throw new FileNotFoundException("failed to create parent directory");
+ }
+ }
+ fos = new FileOutputStream(new File(path.toString()));
+ output = new BufferedOutputStream(fos, 64*1024);
+ }
+
+ @Override
+ public void sync() throws IOException {
+ output.flush();
+ fos.getFD().sync();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ output.write(b);
+ }
+ }
+
+ public class LocalInputStream extends InputStream implements Seekable, PositionedReadable {
+
+ private FileInputStream input;
+
+ public LocalInputStream(Path path) throws IOException {
+ input = new FileInputStream(path.toString());
+ }
+
+ @Override
+ public int read(long l, byte[] bytes, int i, int i2) throws IOException {
+ throw new IOException("unsupported operation");
+ }
+
+ @Override
+ public void readFully(long l, byte[] bytes, int i, int i2) throws IOException {
+ throw new IOException("unsupported operation");
+ }
+
+ @Override
+ public void readFully(long l, byte[] bytes) throws IOException {
+ throw new IOException("unsupported operation");
+ }
+
+ @Override
+ public void seek(long l) throws IOException {
+ input.reset();
+ input.skip(l);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ throw new IOException("getPos not supported");
+ }
+
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ throw new IOException("seekToNewSource not supported");
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ input.read(b);
+ return (int) b[0] & 0xFF;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
index d58de2f..7023373 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
@@ -17,10 +17,17 @@
*/
package org.apache.drill.exec.work.batch;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SetupException;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.record.RawFragmentBatch;
@@ -38,7 +45,7 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
private final AtomicInteger parentAccounter;
private final AtomicInteger finishedStreams = new AtomicInteger();
- public AbstractFragmentCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired) {
+ public AbstractFragmentCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired, FragmentContext context) {
Preconditions.checkArgument(minInputsRequired > 0);
Preconditions.checkNotNull(receiver);
Preconditions.checkNotNull(parentAccounter);
@@ -48,8 +55,15 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
this.remainders = new AtomicIntegerArray(incoming.size());
this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
this.buffers = new RawBatchBuffer[minInputsRequired];
- for(int i = 0; i < buffers.length; i++){
- buffers[i] = new UnlimitedRawBatchBuffer();
+ try {
+ String bufferClassName = context.getConfig().getString(ExecConstants.SPOOLING_BUFFER_IMPL);
+ Constructor<?> bufferConstructor = Class.forName(bufferClassName).getConstructor(FragmentContext.class);
+ for(int i = 0; i < buffers.length; i++) {
+ buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context);
+ }
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
+ NoSuchMethodException | ClassNotFoundException e) {
+ context.fail(e);
}
if (receiver.supportsOutOfOrderExchange()) {
this.remainingRequired = new AtomicInteger(1);
@@ -68,7 +82,7 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
public abstract void streamFinished(int minorFragmentId);
- public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) {
+ public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) throws IOException {
boolean decremented = false;
if (remainders.compareAndSet(minorFragmentId, 0, 1)) {
int rem = remainingRequired.decrementAndGet();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
index 236b239..539393c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
@@ -21,10 +21,12 @@ package org.apache.drill.exec.work.batch;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import java.io.IOException;
+
interface BatchCollector {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCollector.class);
- public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch);
+ public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) throws IOException ;
public int getOppositeMajorFragmentId();
public RawBatchBuffer[] getBuffers();
public int getTotalIncomingFragments();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
index 5639851..5d20026 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
@@ -111,7 +111,7 @@ public class BitComHandlerImpl implements BitComHandler {
@Override
public void startNewRemoteFragment(PlanFragment fragment){
logger.debug("Received remote fragment start instruction", fragment);
- FragmentContext context = new FragmentContext(bee.getContext(), fragment.getHandle(), null, null,new FunctionImplementationRegistry(bee.getContext().getConfig()));
+ FragmentContext context = new FragmentContext(bee.getContext(), fragment.getHandle(), null, new FunctionImplementationRegistry(bee.getContext().getConfig()));
BitTunnel tunnel = bee.getContext().getBitCom().getTunnel(fragment.getForeman());
RemoteFragmentRunnerListener listener = new RemoteFragmentRunnerListener(context, tunnel);
try{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index c9e5608..992d9a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -17,11 +17,13 @@
*/
package org.apache.drill.exec.work.batch;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
@@ -41,12 +43,14 @@ public class IncomingBuffers {
private final AtomicInteger streamsRemaining = new AtomicInteger(0);
private final AtomicInteger remainingRequired = new AtomicInteger(0);
private final Map<Integer, BatchCollector> fragCounts;
+ private final FragmentContext context;
- public IncomingBuffers(PhysicalOperator root) {
+ public IncomingBuffers(PhysicalOperator root, FragmentContext context) {
+ this.context = context;
Map<Integer, BatchCollector> counts = Maps.newHashMap();
CountRequiredFragments reqFrags = new CountRequiredFragments();
root.accept(reqFrags, counts);
-
+
logger.debug("Came up with a list of {} required fragments. Fragments {}", remainingRequired.get(), counts);
fragCounts = ImmutableMap.copyOf(counts);
@@ -68,10 +72,14 @@ public class IncomingBuffers {
int sendMajorFragmentId = batch.getHeader().getSendingMajorFragmentId();
BatchCollector fSet = fragCounts.get(sendMajorFragmentId);
if (fSet == null) throw new FragmentSetupException(String.format("We received a major fragment id that we were not expecting. The id was %d.", sendMajorFragmentId));
- boolean decremented = fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
-
- // we should only return true if remaining required has been decremented and is currently equal to zero.
- return decremented && remainingRequired.get() == 0;
+ try {
+ boolean decremented = fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
+
+ // we should only return true if remaining required has been decremented and is currently equal to zero.
+ return decremented && remainingRequired.get() == 0;
+ } catch (IOException e) {
+ throw new FragmentSetupException(e);
+ }
}
public int getRemainingRequired() {
@@ -94,9 +102,9 @@ public class IncomingBuffers {
public Void visitReceiver(Receiver receiver, Map<Integer, BatchCollector> counts) throws RuntimeException {
BatchCollector set;
if (receiver.supportsOutOfOrderExchange()) {
- set = new MergingCollector(remainingRequired, receiver);
+ set = new MergingCollector(remainingRequired, receiver, context);
} else {
- set = new PartitionedCollector(remainingRequired, receiver);
+ set = new PartitionedCollector(remainingRequired, receiver, context);
}
counts.put(set.getOppositeMajorFragmentId(), set);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
index 670347c..1c92bbb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -19,14 +19,15 @@ package org.apache.drill.exec.work.batch;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.Receiver;
public class MergingCollector extends AbstractFragmentCollector{
private AtomicInteger streamsRunning;
- public MergingCollector(AtomicInteger parentAccounter, Receiver receiver) {
- super(parentAccounter, receiver, 1);
+ public MergingCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
+ super(parentAccounter, receiver, 1, context);
streamsRunning = new AtomicInteger(receiver.getProvidingEndpoints().size());
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
index af12778..f998eff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -19,12 +19,13 @@ package org.apache.drill.exec.work.batch;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.Receiver;
public class PartitionedCollector extends AbstractFragmentCollector{
- public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver) {
- super(parentAccounter, receiver, receiver.getProvidingEndpoints().size());
+ public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
+ super(parentAccounter, receiver, receiver.getProvidingEndpoints().size(), context);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
index 82ed1ca..e7d3d06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
@@ -21,10 +21,12 @@ import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.record.RawFragmentBatchProvider;
import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import java.io.IOException;
+
public interface RawBatchBuffer extends RawFragmentBatchProvider{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawBatchBuffer.class);
- public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch);
+ public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) throws IOException;
/**
* Inform the buffer that no more records are expected.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
new file mode 100644
index 0000000..fa20b3d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Queues;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import org.apache.drill.exec.store.LocalSyncableFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This implementation of RawBatchBuffer starts writing incoming batches to disk once the buffer size reaches a threshold.
+ * The order of the incoming buffers is maintained.
+ */
+public class SpoolingRawBatchBuffer implements RawBatchBuffer {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SpoolingRawBatchBuffer.class);
+
+ private static String HADOOP_FILESYSTEM_DEFAULT_NAME = "fs.default.name";
+ private static String DRILL_LOCAL_IMPL_STRING = "fs.drill-local.impl";
+ private static final float STOP_SPOOLING_FRACTION = (float) 0.5;
+
+ private final LinkedBlockingDeque<RawFragmentBatchWrapper> buffer = Queues.newLinkedBlockingDeque();
+ private volatile boolean finished = false;
+ private volatile long queueSize = 0;
+ private long threshold;
+ private FragmentContext context;
+ private volatile AtomicBoolean spooling = new AtomicBoolean(false);
+ private FileSystem fs;
+ private Path path;
+ private FSDataOutputStream outputStream;
+ private FSDataInputStream inputStream;
+
+ public SpoolingRawBatchBuffer(FragmentContext context) throws IOException {
+ this.context = context;
+ this.threshold = context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY);
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_FILESYSTEM_DEFAULT_NAME, context.getConfig().getString(ExecConstants.TEMP_FILESYSTEM));
+ conf.set(DRILL_LOCAL_IMPL_STRING, LocalSyncableFileSystem.class.getName());
+ this.fs = FileSystem.get(conf);
+ this.path = new Path(getDir(), getFileName());
+ }
+
+ public static List<String> DIRS = DrillConfig.create().getStringList(ExecConstants.TEMP_DIRECTORIES);
+
+ public static String getDir() {
+ Random random = new Random();
+ return DIRS.get(random.nextInt(DIRS.size()));
+ }
+
+ @Override
+ public synchronized void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) throws IOException {
+ RawFragmentBatchWrapper wrapper;
+ boolean spool = spooling.get();
+ wrapper = new RawFragmentBatchWrapper(batch, !spool);
+ queueSize += wrapper.getBodySize();
+ if (spool) {
+ if (outputStream == null) {
+ outputStream = fs.create(path);
+ }
+ wrapper.writeToStream(outputStream);
+ }
+ buffer.add(wrapper);
+ if (!spool && queueSize > threshold) {
+ logger.debug("Buffer size {} greater than threshold {}. Start spooling to disk", queueSize, threshold);
+ spooling.set(true);
+ }
+ }
+
+ @Override
+ public void kill(FragmentContext context) {
+ cleanup();
+ }
+
+
+ @Override
+ public void finished() {
+ finished = true;
+ }
+
+ @Override
+ public RawFragmentBatch getNext() throws IOException {
+ boolean spool = spooling.get();
+ RawFragmentBatchWrapper w = buffer.poll();
+ RawFragmentBatch batch;
+ if(w == null && !finished){
+ try {
+ w = buffer.take();
+ batch = w.get();
+ queueSize -= w.getBodySize();
+ return batch;
+ } catch (InterruptedException e) {
+ cleanup();
+ return null;
+ }
+ }
+ if (w == null) {
+ cleanup();
+ return null;
+ }
+
+ batch = w.get();
+ queueSize -= w.getBodySize();
+ assert queueSize >= 0;
+ if (spool && queueSize < threshold * STOP_SPOOLING_FRACTION) {
+ logger.debug("buffer size {} less than {}x threshold. Stop spooling.", queueSize, STOP_SPOOLING_FRACTION);
+ spooling.set(false);
+ }
+ return batch;
+ }
+
+ private void cleanup() {
+ try {
+ if (outputStream != null) {
+ outputStream.close();
+ }
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ } catch (IOException e) {
+ logger.warn("Failed to cleanup I/O streams", e);
+ }
+ if (context.getConfig().getBoolean(ExecConstants.SPOOLING_BUFFER_DELETE)) {
+ try {
+ fs.delete(path,false);
+ } catch (IOException e) {
+ logger.warn("Failed to delete temporary files", e);
+ }
+ logger.debug("Deleted file {}", path.toString());
+ }
+ }
+
+ private class RawFragmentBatchWrapper {
+ private RawFragmentBatch batch;
+ private boolean available;
+ private CountDownLatch latch = new CountDownLatch(1);
+ private int bodyLength;
+
+ public RawFragmentBatchWrapper(RawFragmentBatch batch, boolean available) {
+ Preconditions.checkNotNull(batch);
+ this.batch = batch;
+ this.available = available;
+ }
+
+ public boolean isNull() {
+ return batch == null;
+ }
+
+ public RawFragmentBatch get() throws IOException {
+ if (available) {
+ return batch;
+ } else {
+ if (inputStream == null) {
+ inputStream = fs.open(path);
+ }
+ readFromStream(inputStream);
+ available = true;
+ return batch;
+ }
+ }
+
+ public long getBodySize() {
+ if (batch.getBody() == null) {
+ return 0;
+ }
+ assert batch.getBody().readableBytes() >= 0;
+ return batch.getBody().readableBytes();
+ }
+
+ public void writeToStream(FSDataOutputStream stream) throws IOException {
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ available = false;
+ batch.getHeader().writeDelimitedTo(stream);
+ ByteBuf buf = batch.getBody();
+ if (buf == null) {
+ bodyLength = 0;
+ return;
+ }
+ bodyLength = buf.readableBytes();
+ buf.getBytes(0, stream, bodyLength);
+ stream.sync();
+ long t = watch.elapsed(TimeUnit.MICROSECONDS);
+ logger.debug("Took {} us to spool {} to disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
+ buf.release();
+ }
+
+ public void readFromStream(FSDataInputStream stream) throws IOException {
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ ExecProtos.FragmentRecordBatch header = ExecProtos.FragmentRecordBatch.parseDelimitedFrom(stream);
+ ByteBuf buf = context.getAllocator().buffer(bodyLength);
+ buf.writeBytes(stream, bodyLength);
+ batch = new RawFragmentBatch(header, buf);
+ available = true;
+ latch.countDown();
+ long t = watch.elapsed(TimeUnit.MICROSECONDS);
+ logger.debug("Took {} us to read {} from disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
+ }
+ }
+
+ private String getFileName() {
+ ExecProtos.FragmentHandle handle = context.getHandle();
+
+ String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+ int majorFragmentId = handle.getMajorFragmentId();
+ int minorFragmentId = handle.getMinorFragmentId();
+
+ String fileName = String.format("%s_%s_%s", qid, majorFragmentId, minorFragmentId);
+
+ return fileName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 43870da..64012c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -31,6 +31,10 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();
private volatile boolean finished = false;
+
+ public UnlimitedRawBatchBuffer(FragmentContext context) {
+
+ }
@Override
public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index 588316b..2cb57dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -78,9 +78,9 @@ class RunningFragmentManager implements FragmentStatusListener{
// set up the root fragment first so we'll have incoming buffers available.
{
- IncomingBuffers buffers = new IncomingBuffers(rootOperator);
-
- FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment.getHandle(), rootClient, buffers, new FunctionImplementationRegistry(bee.getContext().getConfig()));
+ FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment.getHandle(), rootClient, new FunctionImplementationRegistry(bee.getContext().getConfig()));
+ IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
+ rootContext.setBuffers(buffers);
RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator);
// add fragment to local node.
map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
index d947d68..6157229 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
@@ -56,8 +56,9 @@ public class RemoteFragmentHandler implements IncomingFragmentHandler {
try{
this.fragment = fragment;
this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
- this.buffers = new IncomingBuffers(root);
- this.context = new FragmentContext(context, fragment.getHandle(), null, buffers, new FunctionImplementationRegistry(context.getConfig()));
+ this.context = new FragmentContext(context, fragment.getHandle(), null, new FunctionImplementationRegistry(context.getConfig()));
+ this.buffers = new IncomingBuffers(root, this.context);
+ this.context.setBuffers(buffers);
this.runnerListener = new RemoteFragmentRunnerListener(this.context, foremanTunnel);
this.reader = context.getPlanReader();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 725c6b4..b84c406 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -66,9 +66,18 @@ drill.exec: {
max.width.per.endpoint: 5,
global.max.width: 100,
executor.threads: 4
- }
+ },
trace: {
directory: "/var/log/drill",
filesystem: "file:///"
+ },
+ tmp: {
+ directories: ["/tmp/drill"],
+ filesystem: "drill-local:///"
+ },
+ spooling: {
+ impl: "org.apache.drill.exec.work.batch.SpoolingRawBatchBuffer",
+ delete: false,
+ size: 100000000
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
index 0ecab3a..c06818d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
@@ -74,7 +74,7 @@ public class DumpCatTest {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/simple_trace.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
index aa67ba5..8f56464 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
@@ -67,7 +67,7 @@ public class TestRepeatedFunction {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/physical_repeated_1.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
boolean oneIsOne = false;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
index 4dea33d..ea0ac2c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
@@ -62,7 +62,7 @@ public class TestComparisonFunctions {
String planString = Resources.toString(Resources.getResource(COMPARISON_TEST_PHYSICAL_PLAN), Charsets.UTF_8).replaceAll("EXPRESSION", expression);
if(reader == null) reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
if(registry == null) registry = new FunctionImplementationRegistry(c);
- if(context == null) context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ if(context == null) context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
PhysicalPlan plan = reader.readPhysicalPlan(planString);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 05d57be..99ab362 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -99,7 +99,7 @@ public class TestOptiqPlans {
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext fctxt = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext fctxt = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(fctxt, (FragmentRoot) pp.getSortedOperators(false).iterator().next()));
return exec;
@@ -278,7 +278,7 @@ public class TestOptiqPlans {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), reg);
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
return exec;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
index 411f21c..d92d9fa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
@@ -65,7 +65,7 @@ public class TestSimpleFunctions {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testIsNull.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
@@ -93,7 +93,7 @@ public class TestSimpleFunctions {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testIsNotNull.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
@@ -121,7 +121,7 @@ public class TestSimpleFunctions {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testSubstring.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
@@ -161,7 +161,7 @@ public class TestSimpleFunctions {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testSubstringNegative.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
@@ -201,7 +201,7 @@ public class TestSimpleFunctions {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testByteSubstring.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
index 4eae66f..89f3292 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
@@ -61,7 +61,7 @@ public class TestAgg {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
return exec;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
index d1c756f..5429bcf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -64,7 +64,7 @@ public class TestSimpleFilter {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test1.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
assertEquals(50, exec.getRecordCount());
@@ -88,7 +88,7 @@ public class TestSimpleFilter {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test_sv4.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int recordCount = 0;
while(exec.next()) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
index 41cb034..0120c7e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -70,7 +70,7 @@ public class TestMergeJoin {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/join/merge_join.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int totalRecordCount = 0;
@@ -124,7 +124,7 @@ public class TestMergeJoin {
.replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.left.json").toURI().toString())
.replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.right.json").toURI().toString()));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int totalRecordCount = 0;
@@ -177,7 +177,7 @@ public class TestMergeJoin {
.replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.left.json").toURI().toString())
.replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.right.json").toURI().toString()));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int totalRecordCount = 0;
@@ -230,7 +230,7 @@ public class TestMergeJoin {
.replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_multi_batch.left.json").toURI().toString())
.replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_multi_batch.right.json").toURI().toString()));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int totalRecordCount = 0;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
index 1ee9ceb..d82c0e8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
@@ -95,7 +95,7 @@ public class TestSimpleLimit {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/" + testPlan), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int recordCount = 0;
while(exec.next()){
@@ -114,7 +114,7 @@ public class TestSimpleLimit {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/" + testPlan), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int recordCount = 0;
long sum = 0;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
index 72dbe6e..fa67e06 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
@@ -67,7 +67,7 @@ public class TestSimpleProjection {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/project/test1.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
index 4472668..a5837ed 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
@@ -66,7 +66,7 @@ public class TestSimpleSort {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/sort/one_key_sort.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int previousInt = Integer.MIN_VALUE;
@@ -114,7 +114,7 @@ public class TestSimpleSort {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/sort/two_key_sort.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int previousInt = Integer.MIN_VALUE;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
index 5d2b67c..b53d1d3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
@@ -66,7 +66,7 @@ public class TestSVRemover {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/remover/test1.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
int count = exec.getRecordCount();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
index a3d923e..11aae2f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
@@ -70,7 +70,7 @@ public class TestTraceMultiRecordBatch {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/multi_record_batch_trace.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
index 7643cf8..10ce997 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
@@ -84,7 +84,7 @@ public class TestTraceOutputDump {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/simple_trace.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
index fe90ad4..836f177 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
@@ -64,7 +64,7 @@ public class TestSimpleUnion {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/union/test1.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int[] counts = new int[]{100,50};
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/FileTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/FileTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/FileTest.java
new file mode 100644
index 0000000..4406e04
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/FileTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+import com.google.common.base.Stopwatch;
+import org.apache.drill.exec.store.LocalSyncableFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class FileTest {
+ public static void main(String[] args) throws IOException {
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "sync:///");
+ System.out.println(FileSystem.getDefaultUri(conf));
+ FileSystem fs = FileSystem.get(conf);
+// FileSystem fs = new LocalSyncableFileSystem(conf);
+ Path path = new Path("/tmp/testFile");
+ FSDataOutputStream out = fs.create(path);
+ byte[] s = "hello world".getBytes();
+ out.write(s);
+ out.sync();
+// out.close();
+ FSDataInputStream in = fs.open(path);
+ byte[] bytes = new byte[s.length];
+ in.read(bytes);
+ System.out.println(new String(bytes));
+ File file = new File("/tmp/testFile");
+ FileOutputStream fos = new FileOutputStream(file);
+ FileInputStream fis = new FileInputStream(file);
+ fos.write(s);
+ fos.getFD().sync();
+ fis.read(bytes);
+ System.out.println(new String(bytes));
+ out = fs.create(new Path("/tmp/file"));
+ for (int i = 0; i < 100; i++) {
+ bytes = new byte[256*1024];
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ out.write(bytes);
+ out.sync();
+ long t = watch.elapsed(TimeUnit.MILLISECONDS);
+ System.out.printf("Elapsed: %d. Rate %d.\n", t, (long) ((long) bytes.length * 1000L / t));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
new file mode 100644
index 0000000..cf5e128
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSpoolingBuffer {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSpoolingBuffer.class);
+
+ @Test
+ public void testMultipleExchangesSingleThread() throws Exception {
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ DrillConfig conf = DrillConfig.create("drill-spool-test-module.conf");
+
+ try(Drillbit bit1 = new Drillbit(conf, serviceSet);
+ DrillClient client = new DrillClient(conf, serviceSet.getCoordinator());) {
+
+ bit1.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+ Files.toString(FileUtils.getResourceAsFile("/work/batch/multiple_exchange.json"),
+ Charsets.UTF_8));
+ int count = 0;
+ for(QueryResultBatch b : results) {
+ if (b.getHeader().getRowCount() != 0)
+ count += b.getHeader().getRowCount();
+ }
+ assertEquals(500024, count);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/drill-module.conf b/exec/java-exec/src/test/resources/drill-module.conf
index 99dd863..5803610 100644
--- a/exec/java-exec/src/test/resources/drill-module.conf
+++ b/exec/java-exec/src/test/resources/drill-module.conf
@@ -56,8 +56,8 @@ drill.exec: {
start: 35000
},
work: {
- max.width.per.endpoint: 5,
+ max.width.per.endpoint: 1,
global.max.width: 100,
- executor.threads: 4
+ executor.threads: 1
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/resources/drill-spool-test-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/drill-spool-test-module.conf b/exec/java-exec/src/test/resources/drill-spool-test-module.conf
new file mode 100644
index 0000000..c20cc85
--- /dev/null
+++ b/exec/java-exec/src/test/resources/drill-spool-test-module.conf
@@ -0,0 +1,83 @@
+// This file tells Drill to consider this module when class path scanning.
+// This file can also include any supplementary configuration information.
+// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl"
+
+drill.exec: {
+ cluster-id: "drillbits1"
+ rpc: {
+ user: {
+ server: {
+ port: 31010
+ threads: 1
+ }
+ client: {
+ threads: 1
+ }
+ },
+ bit: {
+ server: {
+ port : 31011,
+ retry:{
+ count: 7200,
+ delay: 500
+ },
+ threads: 1
+ }
+ },
+ use.ip : false
+ },
+ operator: {
+ packages += "org.apache.drill.exec.physical.config"
+ },
+ optimizer: {
+ implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
+ },
+ functions: ["org.apache.drill.expr.fn.impl"],
+ storage: {
+ packages += "org.apache.drill.exec.store"
+ },
+ metrics : {
+ context: "drillbit",
+ jmx: {
+ enabled : true
+ },
+ log: {
+ enabled : false,
+ interval : 60
+ }
+ },
+ zk: {
+ connect: "localhost:2181",
+ root: "/drill",
+ refresh: 500,
+ timeout: 5000,
+ retry: {
+ count: 7200,
+ delay: 500
+ }
+ },
+ functions: ["org.apache.drill.expr.fn.impl"],
+ network: {
+ start: 35000
+ },
+ work: {
+ max.width.per.endpoint: 1,
+ global.max.width: 100,
+ executor.threads: 1
+ },
+ trace: {
+ directory: "/var/log/drill",
+ filesystem: "file:///"
+ },
+ tmp: {
+ directories: ["/tmp/drill"],
+ filesystem: "drill-local:///"
+ },
+ spooling: {
+ impl: "org.apache.drill.exec.work.batch.SpoolingRawBatchBuffer",
+ delete: false,
+ size: 0
+ }
+}
\ No newline at end of file