You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/09/12 17:17:40 UTC
[31/37] DRILL-1402: Add check-style rules for trailing space,
TABs and blocks without braces
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index faca32a..39bdb94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -66,31 +66,32 @@ public final class JoinStatus {
this.joinType = output.getJoinType();
}
- private final IterOutcome nextLeft(){
+ private final IterOutcome nextLeft() {
return outputBatch.next(LEFT_INPUT, left);
}
- private final IterOutcome nextRight(){
+ private final IterOutcome nextRight() {
return outputBatch.next(RIGHT_INPUT, right);
}
- public final void ensureInitial(){
- if(!initialSet){
+ public final void ensureInitial() {
+ if(!initialSet) {
this.lastLeft = nextLeft();
this.lastRight = nextRight();
initialSet = true;
}
}
- public final void advanceLeft(){
+ public final void advanceLeft() {
leftPosition++;
}
- public final void advanceRight(){
- if (rightSourceMode == RightSourceMode.INCOMING)
+ public final void advanceRight() {
+ if (rightSourceMode == RightSourceMode.INCOMING) {
rightPosition++;
- else
+ } else {
svRightPosition++;
+ }
}
public final int getLeftPosition() {
@@ -101,7 +102,7 @@ public final class JoinStatus {
return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition;
}
- public final int getRightCount(){
+ public final int getRightCount() {
return right.getRecordCount();
}
@@ -153,9 +154,10 @@ public final class JoinStatus {
* Check if the left record position can advance by one.
* Side effect: advances to next left batch if current left batch size is exceeded.
*/
- public final boolean isLeftPositionAllowed(){
- if (lastLeft == IterOutcome.NONE)
+ public final boolean isLeftPositionAllowed() {
+ if (lastLeft == IterOutcome.NONE) {
return false;
+ }
if (!isLeftPositionInCurrentBatch()) {
leftPosition = 0;
releaseData(left);
@@ -170,11 +172,13 @@ public final class JoinStatus {
* Check if the right record position can advance by one.
* Side effect: advances to next right batch if current right batch size is exceeded
*/
- public final boolean isRightPositionAllowed(){
- if (rightSourceMode == RightSourceMode.SV4)
+ public final boolean isRightPositionAllowed() {
+ if (rightSourceMode == RightSourceMode.SV4) {
return svRightPosition < sv4.getCount();
- if (lastRight == IterOutcome.NONE)
+ }
+ if (lastRight == IterOutcome.NONE) {
return false;
+ }
if (!isRightPositionInCurrentBatch()) {
rightPosition = 0;
releaseData(right);
@@ -185,11 +189,13 @@ public final class JoinStatus {
return true;
}
- private void releaseData(RecordBatch b){
- for(VectorWrapper<?> v : b){
+ private void releaseData(RecordBatch b) {
+ for (VectorWrapper<?> v : b) {
v.clear();
}
- if(b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) b.getSelectionVector2().clear();
+ if (b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) {
+ b.getSelectionVector2().clear();
+ }
}
/**
@@ -220,29 +226,34 @@ public final class JoinStatus {
return rightPosition + 1 < right.getRecordCount();
}
- public JoinOutcome getOutcome(){
- if (!ok)
+ public JoinOutcome getOutcome() {
+ if (!ok) {
return JoinOutcome.FAILURE;
+ }
if (bothMatches(IterOutcome.NONE) ||
(joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) ||
(joinType == JoinRelType.LEFT && lastLeft == IterOutcome.NONE) ||
- (joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE))
+ (joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE)) {
return JoinOutcome.NO_MORE_DATA;
+ }
if (bothMatches(IterOutcome.OK) ||
- (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK)))
+ (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) {
return JoinOutcome.BATCH_RETURNED;
- if (eitherMatches(IterOutcome.OK_NEW_SCHEMA))
+ }
+ if (eitherMatches(IterOutcome.OK_NEW_SCHEMA)) {
return JoinOutcome.SCHEMA_CHANGED;
- if (eitherMatches(IterOutcome.NOT_YET))
+ }
+ if (eitherMatches(IterOutcome.NOT_YET)) {
return JoinOutcome.WAITING;
+ }
return JoinOutcome.FAILURE;
}
- private boolean bothMatches(IterOutcome outcome){
+ private boolean bothMatches(IterOutcome outcome) {
return lastLeft == outcome && lastRight == outcome;
}
- private boolean eitherMatches(IterOutcome outcome){
+ private boolean eitherMatches(IterOutcome outcome) {
return lastLeft == outcome || lastRight == outcome;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index bb3b9ac..c1dffc1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -94,8 +94,9 @@ public abstract class JoinTemplate implements JoinWorker {
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
// we've hit the end of the right record batch; copy any remaining values from the left batch
while (status.isLeftPositionAllowed()) {
- if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
+ if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) {
return false;
+ }
status.incOutputPos();
status.advanceLeft();
@@ -103,8 +104,9 @@ public abstract class JoinTemplate implements JoinWorker {
}
return true;
}
- if (!status.isLeftPositionAllowed())
+ if (!status.isLeftPositionAllowed()) {
return true;
+ }
int comparison = doCompare(status.getLeftPosition(), status.getRightPosition());
switch (comparison) {
@@ -112,8 +114,9 @@ public abstract class JoinTemplate implements JoinWorker {
case -1:
// left key < right key
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
- if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
+ if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) {
return false;
+ }
status.incOutputPos();
}
status.advanceLeft();
@@ -125,25 +128,27 @@ public abstract class JoinTemplate implements JoinWorker {
// check for repeating values on the left side
if (!status.isLeftRepeating() &&
status.isNextLeftPositionInCurrentBatch() &&
- doCompareNextLeftKey(status.getLeftPosition()) == 0)
+ doCompareNextLeftKey(status.getLeftPosition()) == 0) {
// subsequent record(s) in the left batch have the same key
status.notifyLeftRepeating();
-
- else if (status.isLeftRepeating() &&
+ } else if (status.isLeftRepeating() &&
status.isNextLeftPositionInCurrentBatch() &&
- doCompareNextLeftKey(status.getLeftPosition()) != 0)
+ doCompareNextLeftKey(status.getLeftPosition()) != 0) {
// this record marks the end of repeated keys
status.notifyLeftStoppedRepeating();
+ }
boolean crossedBatchBoundaries = false;
int initialRightPosition = status.getRightPosition();
do {
// copy all equal right keys to the output record batch
- if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
+ if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) {
return false;
+ }
- if (!doCopyRight(status.getRightPosition(), status.getOutPosition()))
+ if (!doCopyRight(status.getRightPosition(), status.getOutPosition())) {
return false;
+ }
status.incOutputPos();
@@ -159,9 +164,10 @@ public abstract class JoinTemplate implements JoinWorker {
} while ((!status.isLeftRepeating() || status.isRightPositionInCurrentBatch()) && status.isRightPositionAllowed() && doCompare(status.getLeftPosition(), status.getRightPosition()) == 0);
if (status.getRightPosition() > initialRightPosition &&
- (status.isLeftRepeating() || ! status.isNextLeftPositionInCurrentBatch()))
+ (status.isLeftRepeating() || ! status.isNextLeftPositionInCurrentBatch())) {
// more than one matching result from right table; reset position in case of subsequent left match
status.setRightPosition(initialRightPosition);
+ }
status.advanceLeft();
if (status.isLeftRepeating() && doCompareNextLeftKey(status.getLeftPosition()) != 0) {
@@ -233,5 +239,4 @@ public abstract class JoinTemplate implements JoinWorker {
*/
protected abstract int doCompareNextLeftKey(@Named("leftIndex") int leftIndex);
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index b24b534..1d4e353 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -144,19 +144,21 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
status.ensureInitial();
// loop so we can start over again if we find a new batch was created.
- while(true){
+ while (true) {
JoinOutcome outcome = status.getOutcome();
// if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch.
if (outcome == JoinOutcome.BATCH_RETURNED ||
- outcome == JoinOutcome.SCHEMA_CHANGED)
+ outcome == JoinOutcome.SCHEMA_CHANGED) {
allocateBatch();
+ }
// reset the output position to zero after our parent iterates this RecordBatch
if (outcome == JoinOutcome.BATCH_RETURNED ||
outcome == JoinOutcome.SCHEMA_CHANGED ||
- outcome == JoinOutcome.NO_MORE_DATA)
+ outcome == JoinOutcome.NO_MORE_DATA) {
status.resetOutputPos();
+ }
if (outcome == JoinOutcome.NO_MORE_DATA) {
logger.debug("NO MORE DATA; returning {} NONE");
@@ -164,7 +166,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
boolean first = false;
- if(worker == null){
+ if (worker == null) {
try {
logger.debug("Creating New Worker");
stats.startSetup();
@@ -180,11 +182,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
// join until we have a complete outgoing batch
- if (!worker.doJoin(status))
+ if (!worker.doJoin(status)) {
worker = null;
+ }
// get the outcome of the join.
- switch(status.getOutcome()){
+ switch (status.getOutcome()) {
case BATCH_RETURNED:
// only return new schema if new worker has been setup.
logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK"));
@@ -200,7 +203,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.NONE);
case SCHEMA_CHANGED:
worker = null;
- if(status.getOutPosition() > 0){
+ if (status.getOutPosition() > 0) {
// if we have current data, let's return that.
logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK"));
setRecordCountInContainer();
@@ -218,7 +221,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
private void setRecordCountInContainer() {
- for(VectorWrapper vw : container){
+ for (VectorWrapper vw : container) {
Preconditions.checkArgument(!vw.isHyper());
vw.getValueVector().getMutator().setValueCount(getRecordCount());
}
@@ -257,9 +260,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
// materialize value vector readers from join expression
final LogicalExpression materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry());
- if (collector.hasErrors())
+ if (collector.hasErrors()) {
throw new ClassTransformationException(String.format(
"Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString()));
+ }
// generate compareNextLeftKey()
////////////////////////////////
@@ -475,9 +479,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
} else {
materializedLeftExpr = new TypedNullConstant(Types.optional(MinorType.INT));
}
- if (collector.hasErrors())
+ if (collector.hasErrors()) {
throw new ClassTransformationException(String.format(
"Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString()));
+ }
LogicalExpression materializedRightExpr;
if (worker == null || status.isRightPositionAllowed()) {
@@ -485,9 +490,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
} else {
materializedRightExpr = new TypedNullConstant(Types.optional(MinorType.INT));
}
- if (collector.hasErrors())
+ if (collector.hasErrors()) {
throw new ClassTransformationException(String.format(
"Failure while trying to materialize incoming right field. Errors:\n %s.", collector.toErrorString()));
+ }
// generate compare()
////////////////////////
@@ -519,4 +525,5 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
//Pass the equality check for all the join conditions. Finally, return 0.
cg.getEvalBlock()._return(JExpr.lit(0));
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
index 904d38c..1187bd6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
@@ -50,15 +50,24 @@ public class MergeJoinBatchBuilder {
}
public boolean add(RecordBatch batch) {
- if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE)
+ if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) {
throw new UnsupportedOperationException("A merge join cannot currently work against a sv4 batch.");
- if (batch.getRecordCount() == 0) return true; // skip over empty record batches.
+ }
+ if (batch.getRecordCount() == 0) {
+ return true; // skip over empty record batches.
+ }
// resource checks
long batchBytes = getSize(batch);
- if (batchBytes + runningBytes > Integer.MAX_VALUE) return false; // TODO: 2GB is arbitrary
- if (runningBatches++ >= Character.MAX_VALUE) return false; // allowed in batch.
- if (!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
+ if (batchBytes + runningBytes > Integer.MAX_VALUE) {
+ return false; // TODO: 2GB is arbitrary
+ }
+ if (runningBatches++ >= Character.MAX_VALUE) {
+ return false; // allowed in batch.
+ }
+ if (!svAllocator.preAllocate(batch.getRecordCount()*4)) {
+ return false; // sv allocation available.
+ }
// transfer VVs to a new RecordBatchData
RecordBatchData bd = new RecordBatchData(batch);
@@ -68,9 +77,9 @@ public class MergeJoinBatchBuilder {
return true;
}
- private long getSize(RecordBatch batch){
+ private long getSize(RecordBatch batch) {
long bytes = 0;
- for(VectorWrapper<?> v : batch){
+ for (VectorWrapper<?> v : batch) {
bytes += v.getValueVector().getBufferSize();
}
return bytes;
@@ -78,18 +87,20 @@ public class MergeJoinBatchBuilder {
public void build() throws SchemaChangeException {
container.clear();
- if (queuedRightBatches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+ if (queuedRightBatches.size() > Character.MAX_VALUE) {
+ throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+ }
status.sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
BatchSchema schema = queuedRightBatches.keySet().iterator().next();
List<RecordBatchData> data = queuedRightBatches.get(schema);
// now we're going to generate the sv4 pointers
- switch(schema.getSelectionVectorMode()){
+ switch (schema.getSelectionVectorMode()) {
case NONE: {
int index = 0;
int recordBatchId = 0;
- for(RecordBatchData d : data){
- for(int i =0; i < d.getRecordCount(); i++, index++){
+ for (RecordBatchData d : data) {
+ for (int i =0; i < d.getRecordCount(); i++, index++) {
status.sv4.set(index, recordBatchId, i);
}
recordBatchId++;
@@ -99,8 +110,8 @@ public class MergeJoinBatchBuilder {
case TWO_BYTE: {
int index = 0;
int recordBatchId = 0;
- for(RecordBatchData d : data){
- for(int i =0; i < d.getRecordCount(); i++, index++){
+ for (RecordBatchData d : data) {
+ for (int i =0; i < d.getRecordCount(); i++, index++) {
status.sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
}
// might as well drop the selection vector since we'll stop using it now.
@@ -121,7 +132,7 @@ public class MergeJoinBatchBuilder {
}
}
- for(MaterializedField f : vectors.keySet()){
+ for (MaterializedField f : vectors.keySet()) {
List<ValueVector> v = vectors.get(f);
container.addHyperList(v);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index cf2e36f..29fd80f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -133,7 +133,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
stats.startWait();
try {
RawFragmentBatch b = provider.getNext();
- if(b != null){
+ if (b != null) {
stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
}
@@ -191,7 +191,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
emptyBatch = rawBatch;
}
try {
- while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0);
+ while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
+ ;
+ }
if (rawBatch == null && context.isCancelled()) {
return IterOutcome.STOP;
}
@@ -400,14 +402,17 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
batchOffsets[node.batchId] = 0;
// add front value from batch[x] to priority queue
- if (batchLoaders[node.batchId].getRecordCount() != 0)
+ if (batchLoaders[node.batchId].getRecordCount() != 0) {
pqueue.add(new Node(node.batchId, 0));
+ }
} else {
pqueue.add(new Node(node.batchId, node.valueIndex + 1));
}
- if (prevBatchWasFull) break;
+ if (prevBatchWasFull) {
+ break;
+ }
}
// set the value counts in the outgoing vectors
@@ -589,11 +594,13 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException {
g.setMappingSet(MAIN_MAPPING);
- for(Ordering od : popConfig.getOrderings()){
+ for (Ordering od : popConfig.getOrderings()) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
g.setMappingSet(LEFT_MAPPING);
HoldingContainer left = g.addExpr(expr, false);
g.setMappingSet(RIGHT_MAPPING);
@@ -605,9 +612,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
HoldingContainer out = g.addExpr(fh, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASCENDING){
+ if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
- }else{
+ } else {
jc._then()._return(out.getValue().minus());
}
}
@@ -648,7 +655,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
public void cleanup() {
outgoingContainer.clear();
if (batchLoaders != null) {
- for(RecordBatchLoader rbl : batchLoaders){
+ for (RecordBatchLoader rbl : batchLoaders) {
if (rbl != null) {
rbl.clear();
}
@@ -662,4 +669,4 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 45f32cf..aecf363 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -189,8 +189,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
}
builder.add(incoming);
recordsSampled += incoming.getRecordCount();
- if (upstream == IterOutcome.NONE)
+ if (upstream == IterOutcome.NONE) {
break;
+ }
}
VectorContainer sortedSamples = new VectorContainer();
builder.build(context, sortedSamples);
@@ -258,7 +259,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
try {
- if (!saveSamples()){
+ if (!saveSamples()) {
return false;
}
@@ -277,16 +278,17 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// Wait until sufficient number of fragments have submitted samples, or proceed after xx ms passed
// TODO: this should be polling.
- if (val < fragmentsBeforeProceed)
+ if (val < fragmentsBeforeProceed) {
Thread.sleep(10);
+ }
for (int i = 0; i < 100 && finalTable == null; i++) {
finalTable = tableMap.get(finalTableKey);
- if (finalTable != null){
+ if (finalTable != null) {
break;
}
Thread.sleep(10);
}
- if (finalTable == null){
+ if (finalTable == null) {
buildTable();
}
finalTable = tableMap.get(finalTableKey);
@@ -429,8 +431,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are
// done
- if (upstreamNone && (batchQueue == null || batchQueue.size() == 0))
+ if (upstreamNone && (batchQueue == null || batchQueue.size() == 0)) {
return IterOutcome.NONE;
+ }
// if there are batches on the queue, process them first, rather than calling incoming.next()
if (batchQueue != null && batchQueue.size() > 0) {
@@ -461,7 +464,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// If this is the first iteration, we need to generate the partition vectors before we can proceed
if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
- if (!getPartitionVectors()){
+ if (!getPartitionVectors()) {
cleanup();
return IterOutcome.STOP;
}
@@ -490,8 +493,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema.
if (this.startedUnsampledBatches == false) {
this.startedUnsampledBatches = true;
- if (upstream == IterOutcome.OK)
+ if (upstream == IterOutcome.OK) {
upstream = IterOutcome.OK_NEW_SCHEMA;
+ }
}
switch (upstream) {
case NONE:
@@ -560,8 +564,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
int count = 0;
for (Ordering od : popConfig.getOrderings()) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
- if (collector.hasErrors())
+ if (collector.hasErrors()) {
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
cg.setMappingSet(incomingMapping);
ClassGenerator.HoldingContainer left = cg.addExpr(expr, false);
cg.setMappingSet(partitionMapping);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 051a590..7f3a966 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -120,7 +120,9 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
@Override
public void run() {
try {
- if (stop) return;
+ if (stop) {
+ return;
+ }
outer:
while (true) {
IterOutcome upstream = incoming.next();
@@ -208,4 +210,5 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
this.failed = failed;
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index ec29cac..a1a8340 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -195,55 +195,64 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
private boolean doAlloc() {
//Allocate vv in the allocationVectors.
- for(ValueVector v : this.allocationVectors){
+ for (ValueVector v : this.allocationVectors) {
//AllocationHelper.allocate(v, remainingRecordCount, 250);
- if (!v.allocateNewSafe())
+ if (!v.allocateNewSafe()) {
return false;
+ }
}
//Allocate vv for complexWriters.
- if (complexWriters == null)
+ if (complexWriters == null) {
return true;
+ }
- for (ComplexWriter writer : complexWriters)
+ for (ComplexWriter writer : complexWriters) {
writer.allocate();
+ }
return true;
}
private void setValueCount(int count) {
- for(ValueVector v : allocationVectors){
+ for (ValueVector v : allocationVectors) {
ValueVector.Mutator m = v.getMutator();
m.setValueCount(count);
}
- if (complexWriters == null)
+ if (complexWriters == null) {
return;
+ }
- for (ComplexWriter writer : complexWriters)
+ for (ComplexWriter writer : complexWriters) {
writer.setValueCount(count);
+ }
}
/** hack to make ref and full work together... need to figure out if this is still necessary. **/
- private FieldReference getRef(NamedExpression e){
+ private FieldReference getRef(NamedExpression e) {
FieldReference ref = e.getRef();
PathSegment seg = ref.getRootSegment();
-// if(seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())){
+// if (seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())) {
// return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition());
// }
return ref;
}
- private boolean isAnyWildcard(List<NamedExpression> exprs){
- for(NamedExpression e : exprs){
- if(isWildcard(e)) return true;
+ private boolean isAnyWildcard(List<NamedExpression> exprs) {
+ for (NamedExpression e : exprs) {
+ if (isWildcard(e)) {
+ return true;
+ }
}
return false;
}
- private boolean isWildcard(NamedExpression ex){
- if( !(ex.getExpr() instanceof SchemaPath)) return false;
+ private boolean isWildcard(NamedExpression ex) {
+ if ( !(ex.getExpr() instanceof SchemaPath)) {
+ return false;
+ }
NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
NameSegment ref = ex.getRef().getRootSegment();
return ref.getPath().equals("*") && expr.getPath().equals("*");
@@ -266,7 +275,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
ClassifierResult result = new ClassifierResult();
boolean classify = isClassificationNeeded(exprs);
- for(int i = 0; i < exprs.size(); i++){
+ for (int i = 0; i < exprs.size(); i++) {
final NamedExpression namedExpression = exprs.get(i);
result.clear();
@@ -278,14 +287,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
Integer value = result.prefixMap.get(result.prefix);
if (value != null && value.intValue() == 1) {
int k = 0;
- for(VectorWrapper<?> wrapper : incoming) {
+ for (VectorWrapper<?> wrapper : incoming) {
ValueVector vvIn = wrapper.getValueVector();
SchemaPath originalPath = vvIn.getField().getPath();
if (k > result.outputNames.size()-1) {
assert false;
}
String name = result.outputNames.get(k++); // get the renamed column names
- if (name == EMPTY_STRING) continue;
+ if (name == EMPTY_STRING) {
+ continue;
+ }
FieldReference ref = new FieldReference(name);
TransferPair tp = wrapper.getValueVector().getTransferPair(ref);
transfers.add(tp);
@@ -293,17 +304,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
} else if (value != null && value.intValue() > 1) { // subsequent wildcards should do a copy of incoming valuevectors
int k = 0;
- for(VectorWrapper<?> wrapper : incoming) {
+ for (VectorWrapper<?> wrapper : incoming) {
ValueVector vvIn = wrapper.getValueVector();
SchemaPath originalPath = vvIn.getField().getPath();
if (k > result.outputNames.size()-1) {
assert false;
}
String name = result.outputNames.get(k++); // get the renamed column names
- if (name == EMPTY_STRING) continue;
+ if (name == EMPTY_STRING) {
+ continue;
+ }
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry() );
- if(collector.hasErrors()){
+ if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
@@ -333,16 +346,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType());
- if(collector.hasErrors()){
+ if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
// add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
- if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
+ if (expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
&& !((ValueVectorReadExpression) expr).hasReadPath()
&& !isAnyWildcard
- && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])
- ) {
+ && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) {
ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
TypedFieldId id = vectorRead.getFieldId();
@@ -358,8 +370,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) {
// Need to process ComplexWriter function evaluation.
// Lazy initialization of the list of complex writers, if not done yet.
- if (complexWriters == null)
+ if (complexWriters == null) {
complexWriters = Lists.newArrayList();
+ }
// The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef());
@@ -419,9 +432,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
private boolean isClassificationNeeded(List<NamedExpression> exprs) {
boolean needed = false;
- for(int i = 0; i < exprs.size(); i++){
+ for (int i = 0; i < exprs.size(); i++) {
final NamedExpression ex = exprs.get(i);
- if (!(ex.getExpr() instanceof SchemaPath)) continue;
+ if (!(ex.getExpr() instanceof SchemaPath)) {
+ continue;
+ }
NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
NameSegment ref = ex.getRef().getRootSegment();
boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
@@ -530,7 +545,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
result.outputNames.add(EMPTY_STRING); // initialize
}
- for(VectorWrapper<?> wrapper : incoming) {
+ for (VectorWrapper<?> wrapper : incoming) {
ValueVector vvIn = wrapper.getValueVector();
String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
// get the prefix of the name
@@ -586,7 +601,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
result.outputNames.add(EMPTY_STRING); // initialize
}
- for(VectorWrapper<?> wrapper : incoming) {
+ for (VectorWrapper<?> wrapper : incoming) {
ValueVector vvIn = wrapper.getValueVector();
String name = vvIn.getField().getPath().getRootSegment().getPath();
String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2);
@@ -627,7 +642,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
int k = 0;
- for(VectorWrapper<?> wrapper : incoming) {
+ for (VectorWrapper<?> wrapper : incoming) {
ValueVector vvIn = wrapper.getValueVector();
String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index b36bd92..49ad390 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -39,27 +39,25 @@ public abstract class ProjectorTemplate implements Projector {
private SelectionVector4 vector4;
private SelectionVectorMode svMode;
- public ProjectorTemplate() throws SchemaChangeException{
+ public ProjectorTemplate() throws SchemaChangeException {
}
@Override
public final int projectRecords(int startIndex, final int recordCount, int firstOutputIndex) {
- switch(svMode){
+ switch (svMode) {
case FOUR_BYTE:
throw new UnsupportedOperationException();
-
case TWO_BYTE:
final int count = recordCount;
- for(int i = 0; i < count; i++, firstOutputIndex++){
- if (!doEval(vector2.getIndex(i), firstOutputIndex))
+ for (int i = 0; i < count; i++, firstOutputIndex++) {
+ if (!doEval(vector2.getIndex(i), firstOutputIndex)) {
return i;
+ }
}
return recordCount;
-
case NONE:
-
final int countN = recordCount;
int i;
for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) {
@@ -68,18 +66,16 @@ public abstract class ProjectorTemplate implements Projector {
}
}
if (i < startIndex + recordCount || startIndex > 0) {
- for(TransferPair t : transfers){
+ for (TransferPair t : transfers) {
t.splitAndTransfer(startIndex, i - startIndex);
}
return i - startIndex;
}
- for(TransferPair t : transfers){
+ for (TransferPair t : transfers) {
t.transfer();
}
return recordCount;
-
-
default:
throw new UnsupportedOperationException();
}
@@ -89,7 +85,7 @@ public abstract class ProjectorTemplate implements Projector {
public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{
this.svMode = incoming.getSchema().getSelectionVectorMode();
- switch(svMode){
+ switch (svMode) {
case FOUR_BYTE:
this.vector4 = incoming.getSelectionVector4();
break;
@@ -104,8 +100,4 @@ public abstract class ProjectorTemplate implements Projector {
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index 8116869..419dc85 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -40,7 +40,7 @@ public class RecordBatchData {
private int recordCount;
VectorContainer container = new VectorContainer();
- public RecordBatchData(VectorAccessible batch){
+ public RecordBatchData(VectorAccessible batch) {
List<ValueVector> vectors = Lists.newArrayList();
if (batch instanceof RecordBatch && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) {
this.sv2 = ((RecordBatch)batch).getSelectionVector2().clone();
@@ -48,8 +48,10 @@ public class RecordBatchData {
this.sv2 = null;
}
- for(VectorWrapper<?> v : batch){
- if(v.isHyper()) throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
+ for (VectorWrapper<?> v : batch) {
+ if (v.isHyper()) {
+ throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
+ }
TransferPair tp = v.getValueVector().getTransferPair();
tp.transfer();
vectors.add(tp.getTo());
@@ -67,9 +69,10 @@ public class RecordBatchData {
container.buildSchema(mode);
}
- public int getRecordCount(){
+ public int getRecordCount() {
return recordCount;
}
+
public List<ValueVector> getVectors() {
List<ValueVector> vectors = Lists.newArrayList();
for (VectorWrapper w : container) {
@@ -91,4 +94,5 @@ public class RecordBatchData {
public VectorContainer getContainer() {
return container;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 3a37491..19f5423 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -82,8 +82,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return builder.getSv4();
}
-
-
@Override
public void cleanup() {
builder.clear();
@@ -93,15 +91,14 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
@Override
public IterOutcome innerNext() {
- if(schema != null){
- if(getSelectionVector4().next()){
+ if (schema != null) {
+ if (getSelectionVector4().next()) {
return IterOutcome.OK;
- }else{
+ } else {
return IterOutcome.NONE;
}
}
-
try{
outer: while (true) {
IterOutcome upstream = incoming.next();
@@ -114,13 +111,15 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return upstream;
case OK_NEW_SCHEMA:
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
- if(!incoming.getSchema().equals(schema)){
- if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ if (!incoming.getSchema().equals(schema)) {
+ if (schema != null) {
+ throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ }
this.schema = incoming.getSchema();
}
// fall through.
case OK:
- if(!builder.add(incoming)){
+ if (!builder.add(incoming)) {
throw new UnsupportedOperationException("Sort doesn't currently support doing an external sort.");
};
break;
@@ -129,7 +128,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
}
}
- if (schema == null || builder.isEmpty()){
+ if (schema == null || builder.isEmpty()) {
// builder may be null at this point if the first incoming batch is empty
return IterOutcome.NONE;
}
@@ -141,7 +140,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return IterOutcome.OK_NEW_SCHEMA;
- }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+ } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
@@ -167,11 +166,13 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
ClassGenerator<Sorter> g = cg.getRoot();
g.setMappingSet(mainMapping);
- for(Ordering od : orderings){
+ for(Ordering od : orderings) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
g.setMappingSet(leftMapping);
HoldingContainer left = g.addExpr(expr, false);
g.setMappingSet(rightMapping);
@@ -183,7 +184,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
HoldingContainer out = g.addExpr(fh, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASCENDING){
+ if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
}else{
jc._then()._return(out.getValue().minus());
@@ -193,8 +194,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
g.getEvalBlock()._return(JExpr.lit(0));
return context.getImplementationClass(cg);
-
-
}
@Override
@@ -207,7 +206,4 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
incoming.kill(sendUpstream);
}
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 80b4ef6..707c41c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -49,14 +49,14 @@ public class SortRecordBatchBuilder {
private SelectionVector4 sv4;
final PreAllocator svAllocator;
- public SortRecordBatchBuilder(BufferAllocator a, long maxBytes){
+ public SortRecordBatchBuilder(BufferAllocator a, long maxBytes) {
this.maxBytes = maxBytes;
this.svAllocator = a.getNewPreAllocator();
}
- private long getSize(VectorAccessible batch){
+ private long getSize(VectorAccessible batch) {
long bytes = 0;
- for(VectorWrapper<?> v : batch){
+ for (VectorWrapper<?> v : batch) {
bytes += v.getValueVector().getBufferSize();
}
return bytes;
@@ -68,8 +68,10 @@ public class SortRecordBatchBuilder {
* @return True if the requested add completed successfully. Returns false in the case that this builder is full and cannot receive additional packages.
* @throws SchemaChangeException
*/
- public boolean add(VectorAccessible batch){
- if(batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
+ public boolean add(VectorAccessible batch) {
+ if (batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) {
+ throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
+ }
if (batch.getRecordCount() == 0 && batches.size() > 0) {
return true; // skip over empty record batches.
}
@@ -78,9 +80,15 @@ public class SortRecordBatchBuilder {
if (batchBytes == 0 && batches.size() > 0) {
return true;
}
- if(batchBytes + runningBytes > maxBytes) return false; // enough data memory.
- if(runningBatches+1 > Character.MAX_VALUE) return false; // allowed in batch.
- if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
+ if (batchBytes + runningBytes > maxBytes) {
+ return false; // enough data memory.
+ }
+ if (runningBatches+1 > Character.MAX_VALUE) {
+ return false; // allowed in batch.
+ }
+ if (!svAllocator.preAllocate(batch.getRecordCount()*4)) {
+ return false; // sv allocation available.
+ }
RecordBatchData bd = new RecordBatchData(batch);
@@ -126,15 +134,19 @@ public class SortRecordBatchBuilder {
}
}
- public boolean isEmpty(){
+ public boolean isEmpty() {
return batches.isEmpty();
}
public void build(FragmentContext context, VectorContainer outputContainer) throws SchemaChangeException{
outputContainer.clear();
- if(batches.keySet().size() > 1) throw new SchemaChangeException("Sort currently only supports a single schema.");
- if(batches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
- if(batches.keys().size() < 1){
+ if (batches.keySet().size() > 1) {
+ throw new SchemaChangeException("Sort currently only supports a single schema.");
+ }
+ if (batches.size() > Character.MAX_VALUE) {
+ throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+ }
+ if (batches.keys().size() < 1) {
assert false : "Invalid to have an empty set of batches with no schemas.";
}
sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
@@ -142,12 +154,12 @@ public class SortRecordBatchBuilder {
List<RecordBatchData> data = batches.get(schema);
// now we're going to generate the sv4 pointers
- switch(schema.getSelectionVectorMode()){
+ switch (schema.getSelectionVectorMode()) {
case NONE: {
int index = 0;
int recordBatchId = 0;
- for(RecordBatchData d : data){
- for(int i =0; i < d.getRecordCount(); i++, index++){
+ for (RecordBatchData d : data) {
+ for (int i =0; i < d.getRecordCount(); i++, index++) {
sv4.set(index, recordBatchId, i);
}
recordBatchId++;
@@ -157,8 +169,8 @@ public class SortRecordBatchBuilder {
case TWO_BYTE: {
int index = 0;
int recordBatchId = 0;
- for(RecordBatchData d : data){
- for(int i =0; i < d.getRecordCount(); i++, index++){
+ for (RecordBatchData d : data) {
+ for (int i =0; i < d.getRecordCount(); i++, index++) {
sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
}
// might as well drop the selection vector since we'll stop using it now.
@@ -173,13 +185,13 @@ public class SortRecordBatchBuilder {
// next, we'll create lists of each of the vector types.
ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create();
- for(RecordBatchData rbd : batches.values()){
- for(ValueVector v : rbd.getVectors()){
+ for (RecordBatchData rbd : batches.values()) {
+ for (ValueVector v : rbd.getVectors()) {
vectors.put(v.getField(), v);
}
}
- for(MaterializedField f : schema){
+ for (MaterializedField f : schema) {
List<ValueVector> v = vectors.get(f);
outputContainer.addHyperList(v, false);
}
@@ -191,11 +203,13 @@ public class SortRecordBatchBuilder {
return sv4;
}
- public void clear(){
- for(RecordBatchData d : batches.values()){
+ public void clear() {
+ for (RecordBatchData d : batches.values()) {
d.container.clear();
}
- if(sv4 != null) sv4.clear();
+ if (sv4 != null) {
+ sv4.clear();
+ }
}
public List<VectorContainer> getHeldRecordBatches() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 609cb29..6d90962 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -88,10 +88,11 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
@Override
public int getRecordCount() {
- if (sv == null)
+ if (sv == null) {
return incoming.getRecordCount();
- else
+ } else {
return sv.getCount();
+ }
}
/**
@@ -125,8 +126,9 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
@Override
protected void setupNewSchema() throws SchemaChangeException {
/* Trace operator does not deal with hyper vectors yet */
- if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+ if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) {
throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+ }
/*
* we have a new schema, clear our existing container to load the new value vectors
@@ -152,8 +154,9 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
@Override
public void cleanup() {
/* Release the selection vector */
- if (sv != null)
+ if (sv != null) {
sv.clear();
+ }
/* Close the file descriptors */
try {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 0e69bcf..171d12c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -111,7 +111,9 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
@Override
public IterOutcome next() {
- if(state == IterOutcome.NONE ) throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again.");
+ if (state == IterOutcome.NONE ) {
+ throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again.");
+ }
state = incoming.next();
if (first && state == IterOutcome.NONE) {
throw new IllegalStateException("The incoming iterator returned a state of NONE on the first batch. There should always be at least one batch output before returning NONE");
@@ -119,14 +121,16 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
if (first && state == IterOutcome.OK) {
throw new IllegalStateException("The incoming iterator returned a state of OK on the first batch. There should always be a new schema on the first batch. Incoming: " + incoming.getClass().getName());
}
- if (first) first = !first;
+ if (first) {
+ first = !first;
+ }
- if(state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {
+ if (state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {
BatchSchema schema = incoming.getSchema();
- if(schema.getFieldCount() == 0){
+ if (schema.getFieldCount() == 0) {
throw new IllegalStateException ("Incoming batch has an empty schema. This is not allowed.");
}
- if(incoming.getRecordCount() > MAX_BATCH_SIZE){
+ if (incoming.getRecordCount() > MAX_BATCH_SIZE) {
throw new IllegalStateException (String.format("Incoming batch of %s has size %d, which is beyond the limit of %d", incoming.getClass().getName(), incoming.getRecordCount(), MAX_BATCH_SIZE));
}
@@ -157,4 +161,5 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
public VectorContainer getOutgoingContainer() {
throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
index 428f335..2f7f531 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
@@ -36,10 +36,11 @@ public class IteratorValidatorInjector extends
IteratorValidatorInjector inject = new IteratorValidatorInjector();
PhysicalOperator newOp = root.accept(inject, context);
- if( !(newOp instanceof FragmentRoot) ) throw new IllegalStateException("This shouldn't happen.");
+ if ( !(newOp instanceof FragmentRoot) ) {
+ throw new IllegalStateException("This shouldn't happen.");
+ }
return (FragmentRoot) newOp;
-
}
/**
@@ -67,12 +68,11 @@ public class IteratorValidatorInjector extends
}
/* Inject trace operator */
- if (newChildren.size() > 0){
+ if (newChildren.size() > 0) {
newOp = op.getNewWithChildren(newChildren);
newOp.setOperatorId(op.getOperatorId());
}
-
return newOp;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 2370070..9359ea1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -143,14 +143,24 @@ public class BatchGroup implements VectorAccessible {
}
public void cleanup() throws IOException {
- if (sv2 != null) sv2.clear();
- if (outputStream != null) outputStream.close();
- if (inputStream != null) inputStream.close();
- if (fs != null && fs.exists(path)) fs.delete(path, false);
+ if (sv2 != null) {
+ sv2.clear();
+ }
+ if (outputStream != null) {
+ outputStream.close();
+ }
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ if (fs != null && fs.exists(path)) {
+ fs.delete(path, false);
+ }
}
public void closeOutputStream() throws IOException {
- if (outputStream != null) outputStream.close();
+ if (outputStream != null) {
+ outputStream.close();
+ }
}
@Override
@@ -181,4 +191,5 @@ public class BatchGroup implements VectorAccessible {
public Iterator<VectorWrapper<?>> iterator() {
return currentContainer.iterator();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 505f567..52249e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -192,12 +192,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
@Override
public IterOutcome innerNext() {
- if(schema != null){
+ if (schema != null) {
if (spillCount == 0) {
- if(schema != null){
- if(getSelectionVector4().next()){
+ if (schema != null) {
+ if (getSelectionVector4().next()) {
return IterOutcome.OK;
- }else{
+ } else {
return IterOutcome.NONE;
}
}
@@ -206,12 +206,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
w.start();
// int count = selector.next();
int count = copier.next(targetRecordCount);
- if(count > 0){
+ if (count > 0) {
long t = w.elapsed(TimeUnit.MICROSECONDS);
logger.debug("Took {} us to merge {} records", t, count);
container.setRecordCount(count);
return IterOutcome.OK;
- }else{
+ } else {
logger.debug("copier returned 0 records");
return IterOutcome.NONE;
}
@@ -236,8 +236,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return upstream;
case OK_NEW_SCHEMA:
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
- if(!incoming.getSchema().equals(schema)){
- if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ if (!incoming.getSchema().equals(schema)) {
+ if (schema != null) {
+ throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ }
this.schema = incoming.getSchema();
this.sorter = createNewSorter(context, incoming);
}
@@ -249,7 +251,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
break;
}
- if (first) first = false;
+ if (first) {
+ first = false;
+ }
totalSizeInMemory += getBufferSize(incoming);
SelectionVector2 sv2;
if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
@@ -291,7 +295,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
break;
case OUT_OF_MEMORY:
highWaterMark = totalSizeInMemory;
- if (batchesSinceLastSpill > 2) mergeAndSpill();
+ if (batchesSinceLastSpill > 2) {
+ mergeAndSpill();
+ }
batchesSinceLastSpill = 0;
break;
default:
@@ -348,7 +354,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return IterOutcome.OK_NEW_SCHEMA;
- }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+ } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
@@ -502,11 +508,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
ClassGenerator<MSorter> g = cg.getRoot();
g.setMappingSet(mainMapping);
- for(Ordering od : orderings){
+ for (Ordering od : orderings) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
g.setMappingSet(leftMapping);
HoldingContainer left = g.addExpr(expr, false);
g.setMappingSet(rightMapping);
@@ -518,7 +526,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
HoldingContainer out = g.addExpr(fh, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASCENDING){
+ if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
}else{
jc._then()._return(out.getValue().minus());
@@ -547,11 +555,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException {
g.setMappingSet(MAIN_MAPPING);
- for(Ordering od : popConfig.getOrderings()){
+ for (Ordering od : popConfig.getOrderings()) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
g.setMappingSet(LEFT_MAPPING);
HoldingContainer left = g.addExpr(expr, false);
g.setMappingSet(RIGHT_MAPPING);
@@ -563,7 +573,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
HoldingContainer out = g.addExpr(fh, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASCENDING){
+ if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
}else{
jc._then()._return(out.getValue().minus());
@@ -590,7 +600,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
List<VectorAllocator> allocators = Lists.newArrayList();
- for(VectorWrapper<?> i : batch){
+ for (VectorWrapper<?> i : batch) {
ValueVector v = TypeHelper.getNewVector(i.getField(), copierAllocator);
outputContainer.add(v);
allocators.add(VectorAllocator.getAllocator(v, 110));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index df79b1a..3fd744f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -84,7 +84,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
while (l < rightStart) {
aux.set(o++, vector4.get(l++));
}
- while (r < rightEnd){
+ while (r < rightEnd) {
aux.set(o++, vector4.get(r++));
}
assert o == outStart + (rightEnd - leftStart);
@@ -97,7 +97,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
}
@Override
- public void sort(VectorContainer container){
+ public void sort(VectorContainer container) {
Stopwatch watch = new Stopwatch();
watch.start();
while (runStarts.size() > 1) {
@@ -109,9 +109,13 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
int left = runStarts.poll();
int right = runStarts.poll();
Integer end = runStarts.peek();
- if (end == null) end = vector4.getTotalCount();
+ if (end == null) {
+ end = vector4.getTotalCount();
+ }
outIndex = merge(left, right, end, outIndex);
- if (outIndex < vector4.getTotalCount()) newRunStarts.add(outIndex);
+ if (outIndex < vector4.getTotalCount()) {
+ newRunStarts.add(outIndex);
+ }
}
if (outIndex < vector4.getTotalCount()) {
copyRun(outIndex, vector4.getTotalCount());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java
index aa2f786..9beef39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java
@@ -35,8 +35,9 @@ public class StarColumnHelper {
List<String> fieldNames = type.getFieldNames();
for (String s : fieldNames) {
- if (s.startsWith(STAR_COLUMN))
+ if (s.startsWith(STAR_COLUMN)) {
return true;
+ }
}
return false;
@@ -71,8 +72,9 @@ public class StarColumnHelper {
// Given a set of prefixes, check if a regular column is subsumed by any of the prefixed star column in the set.
public static boolean subsumeRegColumn(Set<String> prefixes, String fieldName) {
- if (isPrefixedStarColumn(fieldName))
+ if (isPrefixedStarColumn(fieldName)) {
return false; // only applies to regular column.
+ }
return prefixes.contains(extractColumnPrefix(fieldName));
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
index c33bb22..87a1ea3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
@@ -130,25 +130,25 @@ public class DrillCostBase implements DrillRelOptCost {
this.memory = memory;
}
- @Override
- public double getRows() {
- return rowCount;
- }
-
- @Override
- public double getCpu() {
- return cpu;
- }
-
- @Override
- public double getIo() {
- return io;
- }
-
- @Override
- public double getNetwork() {
- return network;
- }
+ @Override
+ public double getRows() {
+ return rowCount;
+ }
+
+ @Override
+ public double getCpu() {
+ return cpu;
+ }
+
+ @Override
+ public double getIo() {
+ return io;
+ }
+
+ @Override
+ public double getNetwork() {
+ return network;
+ }
public double getMemory() {
return memory;
@@ -159,31 +159,31 @@ public class DrillCostBase implements DrillRelOptCost {
return Util.hashCode(rowCount) + Util.hashCode(cpu) + Util.hashCode(io) + Util.hashCode(network);
}
- @Override
- public boolean isInfinite() {
+ @Override
+ public boolean isInfinite() {
return (this == INFINITY)
|| (this.cpu == Double.POSITIVE_INFINITY)
|| (this.io == Double.POSITIVE_INFINITY)
|| (this.network == Double.POSITIVE_INFINITY)
|| (this.rowCount == Double.POSITIVE_INFINITY);
- }
-
- @Override
- public boolean equals(RelOptCost other) {
- // here we compare the individual components similar to VolcanoCost, however
- // an alternative would be to add up the components and compare the total.
- // Note that VolcanoPlanner mainly uses isLe() and isLt() for cost comparisons,
- // not equals().
+ }
+
+ @Override
+ public boolean equals(RelOptCost other) {
+ // here we compare the individual components similar to VolcanoCost, however
+ // an alternative would be to add up the components and compare the total.
+ // Note that VolcanoPlanner mainly uses isLe() and isLt() for cost comparisons,
+ // not equals().
return this == other
|| (other instanceof DrillCostBase
&& (this.cpu == ((DrillCostBase) other).cpu)
&& (this.io == ((DrillCostBase) other).io)
&& (this.network == ((DrillCostBase) other).network)
&& (this.rowCount == ((DrillCostBase) other).rowCount));
- }
+ }
- @Override
- public boolean isEqWithEpsilon(RelOptCost other) {
+ @Override
+ public boolean isEqWithEpsilon(RelOptCost other) {
if (!(other instanceof DrillCostBase)) {
return false;
}
@@ -193,7 +193,7 @@ public class DrillCostBase implements DrillRelOptCost {
&& (Math.abs(this.io - that.io) < RelOptUtil.EPSILON)
&& (Math.abs(this.network - that.network) < RelOptUtil.EPSILON)
&& (Math.abs(this.rowCount - that.rowCount) < RelOptUtil.EPSILON));
- }
+ }
@Override
public boolean isLe(RelOptCost other) {
@@ -216,8 +216,8 @@ public class DrillCostBase implements DrillRelOptCost {
);
}
- @Override
- public RelOptCost plus(RelOptCost other) {
+ @Override
+ public RelOptCost plus(RelOptCost other) {
DrillCostBase that = (DrillCostBase) other;
if ((this == INFINITY) || (that == INFINITY)) {
return INFINITY;
@@ -228,10 +228,10 @@ public class DrillCostBase implements DrillRelOptCost {
this.io + that.io,
this.network + that.network,
this.memory + that.memory);
- }
+ }
- @Override
- public RelOptCost minus(RelOptCost other) {
+ @Override
+ public RelOptCost minus(RelOptCost other) {
if (this == INFINITY) {
return this;
}
@@ -242,18 +242,18 @@ public class DrillCostBase implements DrillRelOptCost {
this.io - that.io,
this.network - that.network,
this.memory - that.memory);
- }
+ }
- @Override
- public RelOptCost multiplyBy(double factor) {
+ @Override
+ public RelOptCost multiplyBy(double factor) {
if (this == INFINITY) {
return this;
}
return new DrillCostBase(rowCount * factor, cpu * factor, io * factor, network * factor);
- }
+ }
- @Override
- public double divideBy(RelOptCost cost) {
+ @Override
+ public double divideBy(RelOptCost cost) {
// Compute the geometric average of the ratios of all of the factors
// which are non-zero and finite.
DrillCostBase that = (DrillCostBase) cost;
@@ -292,7 +292,7 @@ public class DrillCostBase implements DrillRelOptCost {
return 1.0;
}
return Math.pow(d, 1 / n);
- }
+ }
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
index 88e4e28..73c6c72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
@@ -22,7 +22,6 @@ import org.eigenbase.relopt.RelOptCost;
public interface DrillRelOptCost extends RelOptCost {
- double getNetwork();
+ double getNetwork();
}
-
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
index 47d6f14..e527960 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
@@ -26,7 +26,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import com.google.common.collect.Lists;
-public class Fragment implements Iterable<Fragment.ExchangeFragmentPair>{
+public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fragment.class);
private PhysicalOperator root;
@@ -34,19 +34,21 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair>{
private final List<ExchangeFragmentPair> receivingExchangePairs = Lists.newLinkedList();
private Stats stats = new Stats();
- public void addOperator(PhysicalOperator o){
- if(root == null){
+ public void addOperator(PhysicalOperator o) {
+ if (root == null) {
root = o;
}
}
public void addSendExchange(Exchange e) throws FragmentSetupException{
- if(sendingExchange != null) throw new FragmentSetupException("Fragment was trying to add a second SendExchange. ");
+ if (sendingExchange != null) {
+ throw new FragmentSetupException("Fragment was trying to add a second SendExchange. ");
+ }
addOperator(e);
sendingExchange = e;
}
- public void addReceiveExchange(Exchange e, Fragment fragment){
+ public void addReceiveExchange(Exchange e, Fragment fragment) {
this.receivingExchangePairs.add(new ExchangeFragmentPair(e, fragment));
}
@@ -67,28 +69,32 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair>{
return sendingExchange;
}
-// public <T, V> T accept(FragmentVisitor<T, V> visitor, V extra){
+// public <T, V> T accept(FragmentVisitor<T, V> visitor, V extra) {
// return visitor.visit(this, extra);
// }
- public Stats getStats(){
+ public Stats getStats() {
return stats;
}
public class ExchangeFragmentPair {
private Exchange exchange;
private Fragment node;
+
public ExchangeFragmentPair(Exchange exchange, Fragment node) {
super();
this.exchange = exchange;
this.node = node;
}
+
public Exchange getExchange() {
return exchange;
}
+
public Fragment getNode() {
return node;
}
+
@Override
public int hashCode() {
final int prime = 31;
@@ -97,13 +103,12 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair>{
result = prime * result + ((node == null) ? 0 : node.hashCode());
return result;
}
+
@Override
public String toString() {
return "ExchangeFragmentPair [exchange=" + exchange + "]";
}
-
-
}
@Override
@@ -119,22 +124,44 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair>{
@Override
public boolean equals(Object obj) {
- if (this == obj) return true;
- if (obj == null) return false;
- if (getClass() != obj.getClass()) return false;
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
Fragment other = (Fragment) obj;
if (receivingExchangePairs == null) {
- if (other.receivingExchangePairs != null) return false;
- } else if (!receivingExchangePairs.equals(other.receivingExchangePairs)) return false;
+ if (other.receivingExchangePairs != null) {
+ return false;
+ }
+ } else if (!receivingExchangePairs.equals(other.receivingExchangePairs)) {
+ return false;
+ }
if (root == null) {
- if (other.root != null) return false;
- } else if (!root.equals(other.root)) return false;
+ if (other.root != null) {
+ return false;
+ }
+ } else if (!root.equals(other.root)) {
+ return false;
+ }
if (sendingExchange == null) {
- if (other.sendingExchange != null) return false;
- } else if (!sendingExchange.equals(other.sendingExchange)) return false;
+ if (other.sendingExchange != null) {
+ return false;
+ }
+ } else if (!sendingExchange.equals(other.sendingExchange)) {
+ return false;
+ }
if (stats == null) {
- if (other.stats != null) return false;
- } else if (!stats.equals(other.stats)) return false;
+ if (other.stats != null) {
+ return false;
+ }
+ } else if (!stats.equals(other.stats)) {
+ return false;
+ }
return true;
}
@@ -144,6 +171,4 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair>{
+ receivingExchangePairs + ", stats=" + stats + "]";
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
index 690fe45..594356a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
@@ -30,13 +30,15 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MakeFragmentsVisitor.class);
- public MakeFragmentsVisitor(){
+ public MakeFragmentsVisitor() {
}
@Override
public Fragment visitExchange(Exchange exchange, Fragment value) throws FragmentSetupException {
// logger.debug("Visiting Exchange {}", exchange);
- if(value == null) throw new FragmentSetupException("The simple fragmenter was called without a FragmentBuilder value. This will only happen if the initial call to SimpleFragmenter is by a Exchange node. This should never happen since an Exchange node should never be the root node of a plan.");
+ if (value == null) {
+ throw new FragmentSetupException("The simple fragmenter was called without a FragmentBuilder value. This will only happen if the initial call to SimpleFragmenter is by a Exchange node. This should never happen since an Exchange node should never be the root node of a plan.");
+ }
Fragment next = getNextBuilder();
value.addReceiveExchange(exchange, next);
next.addSendExchange(exchange);
@@ -55,21 +57,21 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag
// logger.debug("Visiting Other {}", op);
value = ensureBuilder(value);
value.addOperator(op);
- for(PhysicalOperator child : op){
+ for (PhysicalOperator child : op) {
child.accept(this, value);
}
return value;
}
private Fragment ensureBuilder(Fragment value) throws FragmentSetupException{
- if(value != null){
+ if (value != null) {
return value;
- }else{
+ } else {
return getNextBuilder();
}
}
- public Fragment getNextBuilder(){
+ public Fragment getNextBuilder() {
return new Fragment();
}