You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/22 03:14:56 UTC
[19/24] git commit: more diag fixes
more diag fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/79054a85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/79054a85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/79054a85
Branch: refs/heads/diagnostics2
Commit: 79054a85a979e0d2640855edb0e7fd96b69397fb
Parents: f4c37bf
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed May 21 08:38:11 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed May 21 13:32:00 2014 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 3 +
.../drill/exec/cache/CachedVectorContainer.java | 24 +++-
.../drill/exec/cache/local/LocalCache.java | 7 +-
.../apache/drill/exec/client/DrillClient.java | 2 +-
.../drill/exec/expr/EvaluationVisitor.java | 3 +-
.../exec/physical/impl/join/MergeJoinBatch.java | 132 ++++++++++---------
.../impl/project/ProjectRecordBatch.java | 7 +-
.../exec/planner/common/DrillWriterRelBase.java | 15 +++
.../exec/planner/logical/DrillWriterRel.java | 1 +
.../drill/exec/planner/physical/WriterPrel.java | 7 +-
.../physical/visitor/FinalColumnReorderer.java | 5 +
.../drill/exec/planner/sql/DrillSqlWorker.java | 2 +-
.../org/apache/drill/exec/rpc/BasicServer.java | 5 +-
.../drill/exec/rpc/control/ControllerImpl.java | 6 +-
.../exec/rpc/data/DataConnectionCreator.java | 6 +-
.../org/apache/drill/exec/server/Drillbit.java | 17 ++-
.../drill/exec/service/ServiceEngine.java | 18 +--
.../drill/exec/store/sys/SystemTableScan.java | 6 +
.../src/main/resources/drill-module.conf | 4 +
.../apache/drill/exec/server/TestBitRpc.java | 2 +-
.../exec/store/json/JsonRecordReader2Test.java | 4 +-
exec/java-exec/src/test/resources/logback.xml | 2 +-
pom.xml | 2 +-
23 files changed, 179 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 238fae9..d9e0833 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -71,5 +71,8 @@ public interface ExecConstants {
public static final OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet");
public static final String PARQUET_BLOCK_SIZE = "parquet.block.size";
public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024);
+ public static final String HTTP_ENABLE = "drill.exec.http.enabled";
+ public static final String HTTP_PORT = "drill.exec.http.port";
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
index 1447e28..da0b186 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.cache;
import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.io.output.ByteArrayOutputStream;
@@ -29,7 +31,7 @@ import org.apache.drill.exec.record.WritableBatch;
public class CachedVectorContainer extends LoopedAbstractDrillSerializable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachedVectorContainer.class);
- private final byte[] data;
+ private byte[] data;
private final BufferAllocator allocator;
private VectorContainer container;
@@ -42,6 +44,10 @@ public class CachedVectorContainer extends LoopedAbstractDrillSerializable {
va.clear();
}
+ public CachedVectorContainer(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
public CachedVectorContainer(byte[] data, BufferAllocator allocator) {
this.data = data;
this.allocator = allocator;
@@ -58,6 +64,20 @@ public class CachedVectorContainer extends LoopedAbstractDrillSerializable {
}
+
+ @Override
+ public void read(DataInput input) throws IOException {
+ int len = input.readInt();
+ this.data = new byte[len];
+ input.readFully(data);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(data.length);
+ output.write(data);
+ }
+
public VectorAccessible get() {
if (container == null) {
construct();
@@ -66,7 +86,7 @@ public class CachedVectorContainer extends LoopedAbstractDrillSerializable {
}
public void clear() {
- container.clear();
+ if(container != null) container.clear();
container = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
index 942e09e..1b44c6b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.cache.local;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -31,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.DataInputInputStream;
import org.apache.drill.common.util.DataOutputOutputStream;
import org.apache.drill.exec.cache.Counter;
import org.apache.drill.exec.cache.DistributedCache;
@@ -171,8 +171,7 @@ public class LocalCache implements DistributedCache {
}
}
- ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
- InputStream inputStream = DataInputInputStream.constructInputStream(in);
+ InputStream inputStream = new ByteArrayInputStream(bytes);
try {
V obj = clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
obj.readFromStream(inputStream);
@@ -220,7 +219,7 @@ public class LocalCache implements DistributedCache {
if (m.get(key) == null) return null;
ByteArrayDataOutput b = m.get(key);
byte[] bytes = b.toByteArray();
- return (V) deserialize(m.get(key).toByteArray(), this.clazz);
+ return (V) deserialize(bytes, this.clazz);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 3b87dc4..92097e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -188,7 +188,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
* Closes this client's connection to the server
*/
public void close(){
- this.client.close();
+ if(this.client != null) this.client.close();
if(ownsZkConnection){
try {
this.clusterCoordinator.close();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 731ab6b..ba846b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -383,7 +383,8 @@ public class EvaluationVisitor {
PathSegment seg = e.getReadPath();
int listNum = 0;
boolean lastWasArray = false;
- while(true){
+
+ while(seg != null){
if(seg.isArray()){
lastWasArray = true;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 46dea64..121cfec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -130,77 +130,83 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
@Override
public IterOutcome next() {
+ stats.startProcessing();
+
+ try{
+ // we do this in the here instead of the constructor because don't necessary want to start consuming on construction.
+ status.ensureInitial();
+
+ // loop so we can start over again if we find a new batch was created.
+ while(true){
+
+ JoinOutcome outcome = status.getOutcome();
+ // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch.
+ if (outcome == JoinOutcome.BATCH_RETURNED ||
+ outcome == JoinOutcome.SCHEMA_CHANGED)
+ allocateBatch();
+
+ // reset the output position to zero after our parent iterates this RecordBatch
+ if (outcome == JoinOutcome.BATCH_RETURNED ||
+ outcome == JoinOutcome.SCHEMA_CHANGED ||
+ outcome == JoinOutcome.NO_MORE_DATA)
+ status.resetOutputPos();
+
+ if (outcome == JoinOutcome.NO_MORE_DATA) {
+ logger.debug("NO MORE DATA; returning {} NONE");
+ return IterOutcome.NONE;
+ }
- // we do this in the here instead of the constructor because don't necessary want to start consuming on construction.
- status.ensureInitial();
-
- // loop so we can start over again if we find a new batch was created.
- while(true){
-
- JoinOutcome outcome = status.getOutcome();
- // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch.
- if (outcome == JoinOutcome.BATCH_RETURNED ||
- outcome == JoinOutcome.SCHEMA_CHANGED)
- allocateBatch();
-
- // reset the output position to zero after our parent iterates this RecordBatch
- if (outcome == JoinOutcome.BATCH_RETURNED ||
- outcome == JoinOutcome.SCHEMA_CHANGED ||
- outcome == JoinOutcome.NO_MORE_DATA)
- status.resetOutputPos();
+ boolean first = false;
+ if(worker == null){
+ try {
+ logger.debug("Creating New Worker");
+ stats.startSetup();
+ this.worker = generateNewWorker();
+ first = true;
+ stats.stopSetup();
+ } catch (ClassTransformationException | IOException | SchemaChangeException e) {
+ stats.stopSetup();
+ context.fail(new SchemaChangeException(e));
+ kill();
+ return IterOutcome.STOP;
+ }
+ }
- if (outcome == JoinOutcome.NO_MORE_DATA) {
- logger.debug("NO MORE DATA; returning {} NONE");
- return IterOutcome.NONE;
- }
+ // join until we have a complete outgoing batch
+ if (!worker.doJoin(status))
+ worker = null;
- boolean first = false;
- if(worker == null){
- try {
- logger.debug("Creating New Worker");
- stats.startSetup();
- this.worker = generateNewWorker();
- first = true;
- stats.stopSetup();
- } catch (ClassTransformationException | IOException | SchemaChangeException e) {
- stats.stopSetup();
- context.fail(new SchemaChangeException(e));
+ // get the outcome of the join.
+ switch(status.getOutcome()){
+ case BATCH_RETURNED:
+ // only return new schema if new worker has been setup.
+ logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK"));
+ return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
+ case FAILURE:
kill();
return IterOutcome.STOP;
+ case NO_MORE_DATA:
+ logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE"));
+ return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE;
+ case SCHEMA_CHANGED:
+ worker = null;
+ if(status.getOutPosition() > 0){
+ // if we have current data, let's return that.
+ logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK"));
+ return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
+ }else{
+ // loop again to rebuild worker.
+ continue;
+ }
+ case WAITING:
+ return IterOutcome.NOT_YET;
+ default:
+ throw new IllegalStateException();
}
}
- // join until we have a complete outgoing batch
- if (!worker.doJoin(status))
- worker = null;
-
- // get the outcome of the join.
- switch(status.getOutcome()){
- case BATCH_RETURNED:
- // only return new schema if new worker has been setup.
- logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK"));
- return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
- case FAILURE:
- kill();
- return IterOutcome.STOP;
- case NO_MORE_DATA:
- logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE"));
- return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE;
- case SCHEMA_CHANGED:
- worker = null;
- if(status.getOutPosition() > 0){
- // if we have current data, let's return that.
- logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK"));
- return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
- }else{
- // loop again to rebuild worker.
- continue;
- }
- case WAITING:
- return IterOutcome.NOT_YET;
- default:
- throw new IllegalStateException();
- }
+ }finally{
+ stats.stopProcessing();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index fe19797..96d3242 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -180,10 +180,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
boolean isAnyWildcard = isAnyWildcard(exprs);
if(isAnyWildcard){
+
+ // add this until we have sv2 project on wildcard working correctly.
+ if(incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.NONE){
+ throw new UnsupportedOperationException("Drill doesn't yet wildcard projects where there is a sv2, patch coming shortly.");
+ }
for(VectorWrapper<?> wrapper : incoming){
ValueVector vvIn = wrapper.getValueVector();
- String name = vvIn.getField().getPath().getLastSegment().getNameSegment().getPath();
+ String name = vvIn.getField().getPath().getRootSegment().getPath();
FieldReference ref = new FieldReference(name);
TransferPair tp = wrapper.getValueVector().getTransferPair(ref);
transfers.add(tp);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWriterRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWriterRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWriterRelBase.java
index 357cb2e..03431d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWriterRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWriterRelBase.java
@@ -17,18 +17,33 @@
*/
package org.apache.drill.exec.planner.common;
+import java.util.List;
+
import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.eigenbase.rel.RelNode;
import org.eigenbase.rel.SingleRel;
import org.eigenbase.relopt.Convention;
import org.eigenbase.relopt.RelOptCluster;
import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import com.google.common.collect.ImmutableList;
+import com.google.hive12.common.collect.Lists;
/** Base class for logical and physical Writer implemented in Drill. */
public abstract class DrillWriterRelBase extends SingleRel implements DrillRelNode {
+ private static final List<String> FIELD_NAMES = ImmutableList.of("Fragment", "Number of records written");
private final CreateTableEntry createTableEntry;
+ protected void setRowType(){
+ List<RelDataType> fields = Lists.newArrayList();
+ fields.add(this.getCluster().getTypeFactory().createSqlType(SqlTypeName.VARCHAR, 255));
+ fields.add(this.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT));
+ this.rowType = this.getCluster().getTypeFactory().createStructType(fields, FIELD_NAMES);
+ }
+
public DrillWriterRelBase(Convention convention, RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
CreateTableEntry createTableEntry) {
super(cluster, traitSet, input);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java
index f212026..04dd133 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java
@@ -30,6 +30,7 @@ public class DrillWriterRel extends DrillWriterRelBase implements DrillRel {
public DrillWriterRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, CreateTableEntry createTableEntry) {
super(DRILL_LOGICAL, cluster, traitSet, input, createTableEntry);
+ setRowType();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
index a7f611c..233b20b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
@@ -37,14 +37,11 @@ import com.google.hive12.common.collect.Lists;
public class WriterPrel extends DrillWriterRelBase implements Prel {
- private static final List<String> FIELD_NAMES = ImmutableList.of("Fragment", "Number of records written");
+
public WriterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, CreateTableEntry createTableEntry) {
super(Prel.DRILL_PHYSICAL, cluster, traits, child, createTableEntry);
- List<RelDataType> fields = Lists.newArrayList();
- fields.add(cluster.getTypeFactory().createSqlType(SqlTypeName.VARCHAR, 255));
- fields.add(cluster.getTypeFactory().createSqlType(SqlTypeName.BIGINT));
- this.rowType = cluster.getTypeFactory().createStructType(fields, FIELD_NAMES);
+ setRowType();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
index 6ed3c1f..4ea82cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java
@@ -48,9 +48,14 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc
private Prel addTrivialOrderedProjectPrel(Prel prel){
RelDataType t = prel.getRowType();
+
RexBuilder b = prel.getCluster().getRexBuilder();
List<RexNode> projections = Lists.newArrayList();
int projectCount = t.getFieldList().size();
+
+ // no point in reordering if we only have one column
+ if(projectCount < 2) return prel;
+
for(int i =0; i < projectCount; i++){
projections.add(b.makeInputRef(prel, i));
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index df66dcf..eb2c891 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -100,7 +100,7 @@ public class DrillSqlWorker {
}
public PhysicalPlan getPlan(String sql) throws SqlParseException, ValidationException, RelConversionException, IOException{
- return getPlan(null);
+ return getPlan(sql, null);
}
public PhysicalPlan getPlan(String sql, Pointer<String> textPlan) throws SqlParseException, ValidationException, RelConversionException, IOException{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index a3307cf..a912778 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -142,15 +142,16 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
return null;
}
- public int bind(final int initialPort) throws InterruptedException, DrillbitStartupException {
+ public int bind(final int initialPort, boolean allowPortHunting) throws InterruptedException, DrillbitStartupException {
int port = initialPort - 1;
while (true) {
try {
b.bind(++port).sync();
break;
} catch (Exception e) {
- if (e instanceof BindException)
+ if (e instanceof BindException && allowPortHunting){
continue;
+ }
throw new DrillbitStartupException("Could not bind Drillbit", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
index de8caf6..1cacc4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
@@ -39,19 +39,21 @@ public class ControllerImpl implements Controller {
private final ControlMessageHandler handler;
private final BootStrapContext context;
private final ConnectionManagerRegistry connectionRegistry;
+ private final boolean allowPortHunting;
- public ControllerImpl(BootStrapContext context, ControlMessageHandler handler) {
+ public ControllerImpl(BootStrapContext context, ControlMessageHandler handler, boolean allowPortHunting) {
super();
this.handler = handler;
this.context = context;
this.connectionRegistry = new ConnectionManagerRegistry(handler, context);
+ this.allowPortHunting = allowPortHunting;
}
@Override
public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException {
server = new ControlServer(handler, context, connectionRegistry);
int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
- port = server.bind(port);
+ port = server.bind(port, allowPortHunting);
DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setControlPort(port).build();
connectionRegistry.setEndpoint(completeEndpoint);
return completeEndpoint;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index f15494f..9c2ef5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -37,17 +37,19 @@ public class DataConnectionCreator implements Closeable {
private final BootStrapContext context;
private final WorkEventBus workBus;
private final DataResponseHandler dataHandler;
+ private final boolean allowPortHunting;
- public DataConnectionCreator(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler) {
+ public DataConnectionCreator(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler, boolean allowPortHunting) {
super();
this.context = context;
this.workBus = workBus;
this.dataHandler = dataHandler;
+ this.allowPortHunting = allowPortHunting;
}
public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException {
server = new DataServer(context, workBus, dataHandler);
- int port = server.bind(partialEndpoint.getControlPort() + 1);
+ int port = server.bind(partialEndpoint.getControlPort() + 1, allowPortHunting);
DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setDataPort(port).build();
return completeEndpoint;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index eba6e92..fb499b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -81,11 +81,18 @@ public class Drillbit implements Closeable{
private volatile RegistrationHandle handle;
public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
-
+ boolean allowPortHunting = serviceSet != null;
+ boolean enableHttp = config.getBoolean(ExecConstants.HTTP_ENABLE);
this.context = new BootStrapContext(config);
this.manager = new WorkManager(context);
- this.engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, manager.getWorkBus(), manager.getDataHandler());
- this.embeddedJetty = new Server(8047);
+ this.engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, manager.getWorkBus(), manager.getDataHandler(), allowPortHunting);
+
+ if(enableHttp){
+ this.embeddedJetty = new Server(config.getInt(ExecConstants.HTTP_PORT));
+ }else{
+ this.embeddedJetty = null;
+ }
+
if(serviceSet != null){
this.coord = serviceSet.getCoordinator();
@@ -99,6 +106,8 @@ public class Drillbit implements Closeable{
}
private void startJetty() throws Exception{
+ if(embeddedJetty == null) return;
+
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
context.setContextPath("/");
embeddedJetty.setHandler(context);
@@ -131,7 +140,7 @@ public class Drillbit implements Closeable{
logger.warn("Interrupted while sleeping during coordination deregistration.");
}
try {
- embeddedJetty.stop();
+ if(embeddedJetty != null) embeddedJetty.stop();
} catch (Exception e) {
logger.warn("Failure while shutting down embedded jetty server.");
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index cfbde73..bd745d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -43,23 +43,25 @@ import com.google.common.io.Closeables;
public class ServiceEngine implements Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ServiceEngine.class);
-
+
private final UserServer userServer;
private final Controller controller;
private final DataConnectionCreator dataPool;
private final DrillConfig config;
boolean useIP = false;
-
- public ServiceEngine(ControlMessageHandler controlMessageHandler, UserWorker userWorker, BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler){
+ private final boolean allowPortHunting;
+
+ public ServiceEngine(ControlMessageHandler controlMessageHandler, UserWorker userWorker, BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler, boolean allowPortHunting){
this.userServer = new UserServer(context.getAllocator(), new NioEventLoopGroup(context.getConfig().getInt(ExecConstants.USER_SERVER_RPC_THREADS),
new NamedThreadFactory("UserServer-")), userWorker);
- this.controller = new ControllerImpl(context, controlMessageHandler);
- this.dataPool = new DataConnectionCreator(context, workBus, dataHandler);
+ this.controller = new ControllerImpl(context, controlMessageHandler, allowPortHunting);
+ this.dataPool = new DataConnectionCreator(context, workBus, dataHandler, allowPortHunting);
this.config = context.getConfig();
+ this.allowPortHunting = allowPortHunting;
}
-
+
public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{
- int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT));
+ int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT), allowPortHunting);
String address = useIP ? InetAddress.getLocalHost().getHostAddress() : InetAddress.getLocalHost().getCanonicalHostName();
DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder()
.setAddress(address)
@@ -74,7 +76,7 @@ public class ServiceEngine implements Closeable{
public DataConnectionCreator getDataConnectionCreator(){
return dataPool;
}
-
+
public Controller getController() {
return controller;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index 9a745ac..b0133f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.store.StoragePluginRegistry;
import parquet.org.codehaus.jackson.annotate.JsonCreator;
@@ -112,6 +113,11 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
@Override
+ public int getOperatorType() {
+ return CoreOperatorType.SYSTEM_TABLE_SCAN_VALUE;
+ }
+
+ @Override
public GroupScan clone(List<SchemaPath> columns) {
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 9a180fd..26205bd 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -65,6 +65,10 @@ drill.exec: {
delay: 500
}
},
+ http: {
+ enabled: true,
+ port: 8047
+ },
functions: ["org.apache.drill.expr.fn.impl"],
network: {
start: 35000
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
index 74f5ba9..f579448 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -74,7 +74,7 @@ public class TestBitRpc extends ExecTest {
DataResponseHandler drp = new BitComTestHandler();
DataServer server = new DataServer(c, workBus, drp);
- port = server.bind(port);
+ port = server.bind(port, false);
DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
DataConnectionManager manager = new DataConnectionManager(FragmentHandle.getDefaultInstance(), ep, c2);
DataTunnel tunnel = new DataTunnel(manager);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
index 0abdbd3..34bcb5e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
@@ -35,14 +35,14 @@ public class JsonRecordReader2Test extends BaseTestQuery{
}
@Test
- public void z() throws Exception{
+ public void testComplexMultipleTimes() throws Exception{
for(int i =0 ; i < 5; i++){
test("select * from cp.`join/merge_join.json`");
}
}
@Test
- public void y() throws Exception{
+ public void trySimpleQueryWithLimit() throws Exception{
test("select * from cp.`limit/test1.json` limit 10");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/test/resources/logback.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/logback.xml b/exec/java-exec/src/test/resources/logback.xml
index cd3d971..b8e0ca2 100644
--- a/exec/java-exec/src/test/resources/logback.xml
+++ b/exec/java-exec/src/test/resources/logback.xml
@@ -42,7 +42,7 @@
<!-- </logger> -->
<root>
- <level value="info" />
+ <level value="error" />
<appender-ref ref="STDOUT" />
</root>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d4c077a..5d976e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -260,7 +260,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration>
- <argLine>-Xms512m -Xmx1g -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
+ <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
<forkCount>4</forkCount>
<reuseForks>true</reuseForks>
<additionalClasspathElements>