You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/09/12 17:17:41 UTC
[32/37] DRILL-1402: Add check-style rules for trailing space,
TABs and blocks without braces
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
index 26d881d..f6e11c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
@@ -63,8 +63,9 @@ public class SingleMergeExchange extends AbstractExchange {
protected void setupReceivers(List<CoordinationProtos.DrillbitEndpoint> receiverLocations)
throws PhysicalOperatorSetupException {
- if (receiverLocations.size() != 1)
+ if (receiverLocations.size() != 1) {
throw new PhysicalOperatorSetupException("SingleMergeExchange only supports a single receiver endpoint");
+ }
receiverLocation = receiverLocations.iterator().next();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
index cafdbdd..bf2b4a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
@@ -48,7 +48,9 @@ public class UnionExchange extends AbstractExchange{
@Override
protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException {
- if(receiverLocations.size() != 1) throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint.");
+ if (receiverLocations.size() != 1) {
+ throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint.");
+ }
this.destinationLocation = receiverLocations.iterator().next();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 7f97624..e25f1c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -41,9 +41,9 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
private RootExec root = null;
- private ImplCreator(){}
+ private ImplCreator() {}
- private RootExec getRoot(){
+ private RootExec getRoot() {
return root;
}
@@ -78,7 +78,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
ImplCreator i = new ImplCreator();
- if(AssertionUtil.isAssertionsEnabled()){
+ if (AssertionUtil.isAssertionsEnabled()) {
root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
}
@@ -86,9 +86,11 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
watch.start();
root.accept(i, context);
logger.debug("Took {} ms to accept", watch.elapsed(TimeUnit.MILLISECONDS));
- if (i.root == null)
+ if (i.root == null) {
throw new ExecutionSetupException(
"The provided fragment did not have a root node that correctly created a RootExec value.");
+ }
return i.getRoot();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
index 8c768e5..82a9a63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
@@ -42,7 +42,9 @@ public class OperatorCreatorRegistry {
public synchronized Object getOperatorCreator(Class<?> operator) throws ExecutionSetupException {
Object opCreator = instanceRegistry.get(operator);
- if (opCreator != null) return opCreator;
+ if (opCreator != null) {
+ return opCreator;
+ }
Constructor<?> c = constructorRegistry.get(operator);
if(c == null) {
@@ -75,9 +77,9 @@ public class OperatorCreatorRegistry {
Type[] args = ((ParameterizedType)iface).getActualTypeArguments();
interfaceFound = true;
boolean constructorFound = false;
- for(Constructor<?> constructor : operatorClass.getConstructors()){
+ for (Constructor<?> constructor : operatorClass.getConstructors()) {
Class<?>[] params = constructor.getParameterTypes();
- if(params.length == 0){
+ if (params.length == 0) {
Constructor<?> old = constructorRegistry.put((Class<?>) args[0], constructor);
if (old != null) {
throw new RuntimeException(
@@ -88,7 +90,7 @@ public class OperatorCreatorRegistry {
constructorFound = true;
}
}
- if(!constructorFound){
+ if (!constructorFound) {
logger.debug("Skipping registration of OperatorCreator {} as it doesn't have a default constructor",
operatorClass.getCanonicalName());
}
@@ -97,4 +99,5 @@ public class OperatorCreatorRegistry {
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index c2a03b9..2712e27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -83,8 +83,9 @@ public class ScanBatch implements RecordBatch {
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
this.context = context;
this.readers = readers;
- if (!readers.hasNext())
+ if (!readers.hasNext()) {
throw new ExecutionSetupException("A scan batch must contain at least one reader.");
+ }
this.currentReader = readers.next();
this.oContext = new OperatorContext(subScanConfig, context);
this.currentReader.setOperatorContext(this.oContext);
@@ -121,7 +122,7 @@ public class ScanBatch implements RecordBatch {
@Override
public void kill(boolean sendUpstream) {
- if(currentReader != null){
+ if (currentReader != null) {
currentReader.cleanup();
}
@@ -220,8 +221,8 @@ public class ScanBatch implements RecordBatch {
private void addPartitionVectors() throws ExecutionSetupException{
try {
- if(partitionVectors != null){
- for(ValueVector v : partitionVectors){
+ if (partitionVectors != null) {
+ for (ValueVector v : partitionVectors) {
v.clear();
}
}
@@ -290,7 +291,9 @@ public class ScanBatch implements RecordBatch {
if (v == null || v.getClass() != clazz) {
// Field does not exist add it to the map and the output container
v = TypeHelper.getNewVector(field, oContext.getAllocator());
- if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+ if (!clazz.isAssignableFrom(v.getClass())) {
+ throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+ }
container.add(v);
fieldVectorMap.put(field.key(), v);
@@ -342,9 +345,9 @@ public class ScanBatch implements RecordBatch {
return WritableBatch.get(this);
}
- public void cleanup(){
+ public void cleanup() {
container.clear();
- for(ValueVector v : partitionVectors){
+ for (ValueVector v : partitionVectors) {
v.clear();
}
fieldVectorMap.clear();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 2b7fdf3..352deae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -79,7 +79,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
public boolean innerNext() {
- if(!ok){
+ if (!ok) {
incoming.kill(false);
return false;
@@ -93,7 +93,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
out = IterOutcome.NONE;
}
// logger.debug("Outcome of sender next {}", out);
- switch(out){
+ switch (out) {
case STOP:
case NONE:
FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), handle.getMajorFragmentId(),
@@ -158,7 +158,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
public void success(Ack value, ByteBuf buf) {
sendCount.decrement();
- if(value.getOk()) return;
+ if (value.getOk()) {
+ return;
+ }
logger.error("Downstream fragment was not accepted. Stopping future sends.");
// if we didn't get ack ok, we'll need to kill the query.
@@ -170,5 +172,4 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 6eede30..473e3a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -132,10 +132,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
@Override
public IterOutcome innerNext() {
- if(schema != null){
- if(getSelectionVector4().next()){
+ if (schema != null) {
+ if (getSelectionVector4().next()) {
return IterOutcome.OK;
- }else{
+ } else {
return IterOutcome.NONE;
}
}
@@ -156,8 +156,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
return upstream;
case OK_NEW_SCHEMA:
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
- if(!incoming.getSchema().equals(schema)){
- if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ if (!incoming.getSchema().equals(schema)) {
+ if (schema != null) {
+ throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ }
this.schema = incoming.getSchema();
}
// fall through.
@@ -181,7 +183,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
}
- if (schema == null){
+ if (schema == null) {
// builder may be null at this point if the first incoming batch is empty
return IterOutcome.NONE;
}
@@ -196,7 +198,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
return IterOutcome.OK_NEW_SCHEMA;
- }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+ } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
@@ -215,7 +217,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
if (copier == null) {
copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch);
} else {
- for(VectorWrapper<?> i : batch){
+ for (VectorWrapper<?> i : batch) {
ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator());
newContainer.add(v);
@@ -227,7 +229,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
int count = selectionVector4.getCount();
int copiedRecords = copier.copyRecords(0, count);
assert copiedRecords == count;
- for(VectorWrapper<?> v : newContainer){
+ for (VectorWrapper<?> v : newContainer) {
ValueVector.Mutator m = v.getValueVector().getMutator();
m.setValueCount(count);
}
@@ -253,11 +255,13 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
ClassGenerator<PriorityQueue> g = cg.getRoot();
g.setMappingSet(mainMapping);
- for(Ordering od : orderings){
+ for (Ordering od : orderings) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
g.setMappingSet(leftMapping);
HoldingContainer left = g.addExpr(expr, false);
g.setMappingSet(rightMapping);
@@ -269,9 +273,9 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
HoldingContainer out = g.addExpr(fh, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASCENDING){
+ if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
- }else{
+ } else {
jc._then()._return(out.getValue().minus());
}
g.rotateBlock();
@@ -377,5 +381,4 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
index 58dd247..92d1882 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
@@ -82,10 +82,12 @@ public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, Fra
}
/* Inject trace operator */
- if (list.size() > 0)
- newOp = op.getNewWithChildren(list);
- newOp.setOperatorId(op.getOperatorId());
+ if (list.size() > 0) {
+ newOp = op.getNewWithChildren(list);
+ }
+ newOp.setOperatorId(op.getOperatorId());
return newOp;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 99eeed3..8c1a4c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -82,8 +82,9 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
IterOutcome upstream;
do {
upstream = next(incoming);
- if(first && upstream == IterOutcome.OK)
+ if(first && upstream == IterOutcome.OK) {
upstream = IterOutcome.OK_NEW_SCHEMA;
+ }
first = false;
switch(upstream) {
@@ -91,14 +92,15 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
case NONE:
case STOP:
cleanup();
- if (upstream == IterOutcome.STOP)
+ if (upstream == IterOutcome.STOP) {
return upstream;
+ }
break;
case OK_NEW_SCHEMA:
try{
setupNewSchema();
- }catch(Exception ex){
+ } catch(Exception ex) {
kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
@@ -113,9 +115,9 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
throw new RuntimeException(ex);
}
- for(VectorWrapper v : incoming)
+ for(VectorWrapper v : incoming) {
v.getValueVector().clear();
-
+ }
break;
default:
@@ -176,4 +178,5 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
throw new RuntimeException("Failed to close RecordWriter", ex);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index e9be2ac..c522870 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -82,7 +82,9 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
@Override
public int getRecordCount() {
- if(done) return 0;
+ if (done) {
+ return 0;
+ }
return aggregator.getOutputCount();
}
@@ -102,7 +104,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
case STOP:
return outcome;
case OK_NEW_SCHEMA:
- if (!createAggregator()){
+ if (!createAggregator()) {
done = true;
return IterOutcome.STOP;
}
@@ -131,10 +133,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
- while(true){
+ while (true) {
AggOutcome out = aggregator.doWork();
logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
- switch(out){
+ switch (out) {
case CLEANUP_AND_RETURN:
container.zeroVectors();
aggregator.cleanup();
@@ -150,7 +152,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
return aggregator.getOutcome();
case UPDATE_AGGREGATOR:
aggregator = null;
- if(!createAggregator()){
+ if (!createAggregator()) {
return IterOutcome.STOP;
}
continue;
@@ -168,23 +170,23 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
*/
private boolean createAggregator() {
logger.debug("Creating new aggregator.");
- try{
+ try {
stats.startSetup();
this.aggregator = createAggregatorInternal();
return true;
- }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+ } catch (SchemaChangeException | ClassTransformationException | IOException ex) {
context.fail(ex);
container.clear();
incoming.kill(false);
return false;
- }finally{
+ } finally {
stats.stopSetup();
}
}
private HashAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{
- CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- ClassGenerator<HashAggregator> cg = top.getRoot();
+ CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ ClassGenerator<HashAggregator> cg = top.getRoot();
ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
container.clear();
@@ -199,10 +201,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
int i;
- for(i = 0; i < numGroupByExprs; i++) {
+ for (i = 0; i < numGroupByExprs; i++) {
NamedExpression ne = popConfig.getGroupByExprs()[i];
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry() );
- if(expr == null) continue;
+ if (expr == null) {
+ continue;
+ }
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -211,13 +215,17 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
groupByOutFieldIds[i] = container.add(vv);
}
- for(i = 0; i < numAggrExprs; i++){
+ for (i = 0; i < numAggrExprs; i++) {
NamedExpression ne = popConfig.getAggrExprs()[i];
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry() );
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
- if(expr == null) continue;
+ if (expr == null) {
+ continue;
+ }
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -248,7 +256,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
return agg;
}
-
private void setupUpdateAggrValues(ClassGenerator<HashAggregator> cg) {
cg.setMappingSet(UpdateAggrValuesMapping);
@@ -260,8 +267,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
}
- private void setupGetIndex(ClassGenerator<HashAggregator> cg){
- switch(incoming.getSchema().getSelectionVectorMode()){
+ private void setupGetIndex(ClassGenerator<HashAggregator> cg) {
+ switch (incoming.getSchema().getSelectionVectorMode()) {
case FOUR_BYTE: {
JVar var = cg.declareClassField("sv4_", cg.getModel()._ref(SelectionVector4.class));
cg.getBlock("doSetup").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4"));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index b6b8874..d25a952 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -156,7 +156,9 @@ public abstract class HashAggTemplate implements HashAggregator {
boolean status = true;
for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
if (outputRecordValues(i, batchOutputCount) ) {
- if (EXTRA_DEBUG_2) logger.debug("Outputting values to output index: {}", batchOutputCount) ;
+ if (EXTRA_DEBUG_2) {
+ logger.debug("Outputting values to output index: {}", batchOutputCount) ;
+ }
batchOutputCount++;
outNumRecordsHolder.value++;
} else {
@@ -270,31 +272,41 @@ public abstract class HashAggTemplate implements HashAggregator {
outside: while(true) {
// loop through existing records, aggregating the values as necessary.
- if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()...");
+ if (EXTRA_DEBUG_1) {
+ logger.debug ("Starting outer loop of doWork()...");
+ }
for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
- if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+ if(EXTRA_DEBUG_2) {
+ logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+ }
boolean success = checkGroupAndAggrValues(currentIndex);
assert success : "HashAgg couldn't copy values.";
}
- if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex);
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Processed {} records", underlyingIndex);
+ }
- try{
+ try {
- while(true){
+ while (true) {
// Cleanup the previous batch since we are done processing it.
for (VectorWrapper<?> v : incoming) {
v.getValueVector().clear();
}
IterOutcome out = outgoing.next(0, incoming);
- if(EXTRA_DEBUG_1) logger.debug("Received IterOutcome of {}", out);
- switch(out){
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Received IterOutcome of {}", out);
+ }
+ switch (out) {
case NOT_YET:
this.outcome = out;
return AggOutcome.RETURN_OUTCOME;
case OK_NEW_SCHEMA:
- if(EXTRA_DEBUG_1) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
+ }
newSchema = true;
this.cleanup();
// TODO: new schema case needs to be handled appropriately
@@ -302,14 +314,16 @@ public abstract class HashAggTemplate implements HashAggregator {
case OK:
resetIndex();
- if(incoming.getRecordCount() == 0){
+ if (incoming.getRecordCount() == 0) {
continue;
} else {
boolean success = checkGroupAndAggrValues(currentIndex);
assert success : "HashAgg couldn't copy values.";
incIndex();
- if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop");
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Continuing outside loop");
+ }
continue outside;
}
@@ -343,8 +357,10 @@ public abstract class HashAggTemplate implements HashAggregator {
// placeholder...
}
}
- } finally{
- if(first) first = !first;
+ } finally {
+ if (first) {
+ first = !first;
+ }
}
}
@@ -373,7 +389,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
@Override
- public void cleanup(){
+ public void cleanup() {
if (htable != null) {
htable.clear();
htable = null;
@@ -392,28 +408,28 @@ public abstract class HashAggTemplate implements HashAggregator {
}
}
- private final AggOutcome setOkAndReturn(){
- if(first){
+ private final AggOutcome setOkAndReturn() {
+ if (first) {
this.outcome = IterOutcome.OK_NEW_SCHEMA;
- }else{
+ } else {
this.outcome = IterOutcome.OK;
}
- for(VectorWrapper<?> v : outgoing){
+ for (VectorWrapper<?> v : outgoing) {
v.getValueVector().getMutator().setValueCount(outputCount);
}
return AggOutcome.RETURN_OUTCOME;
}
- private final void incIndex(){
+ private final void incIndex() {
underlyingIndex++;
- if(underlyingIndex >= incoming.getRecordCount()){
+ if (underlyingIndex >= incoming.getRecordCount()) {
currentIndex = Integer.MAX_VALUE;
return;
}
currentIndex = getVectorIndex(underlyingIndex);
}
- private final void resetIndex(){
+ private final void resetIndex() {
underlyingIndex = -1;
incIndex();
}
@@ -422,7 +438,9 @@ public abstract class HashAggTemplate implements HashAggregator {
BatchHolder bh = new BatchHolder();
batchHolders.add(bh);
- if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
+ if (EXTRA_DEBUG_1) {
+ logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
+ }
bh.setup();
}
@@ -465,9 +483,9 @@ public abstract class HashAggTemplate implements HashAggregator {
outputCount += numOutputRecords;
- if(first){
+ if (first) {
this.outcome = IterOutcome.OK_NEW_SCHEMA;
- }else{
+ } else {
this.outcome = IterOutcome.OK;
}
@@ -486,14 +504,14 @@ public abstract class HashAggTemplate implements HashAggregator {
} else {
if (!outputKeysStatus) {
logger.debug("Failed to output keys for current batch index: {} ", outBatchIndex);
- for(VectorWrapper<?> v : outContainer) {
+ for (VectorWrapper<?> v : outContainer) {
logger.debug("At the time of failure, size of valuevector in outContainer = {}.", v.getValueVector().getValueCapacity());
}
context.fail(new Exception("Failed to output keys for current batch !"));
}
if (!outputValuesStatus) {
logger.debug("Failed to output values for current batch index: {} ", outBatchIndex);
- for(VectorWrapper<?> v : outContainer) {
+ for (VectorWrapper<?> v : outContainer) {
logger.debug("At the time of failure, size of valuevector in outContainer = {}.", v.getValueVector().getValueCapacity());
}
context.fail(new Exception("Failed to output values for current batch !"));
@@ -557,7 +575,9 @@ public abstract class HashAggTemplate implements HashAggregator {
if (putStatus == HashTable.PutStatus.KEY_PRESENT) {
- if (EXTRA_DEBUG_2) logger.debug("Group-by key already present in hash table, updating the aggregate values");
+ if (EXTRA_DEBUG_2) {
+ logger.debug("Group-by key already present in hash table, updating the aggregate values");
+ }
// debugging
//if (holder.value == 100018 || holder.value == 100021) {
@@ -566,7 +586,9 @@ public abstract class HashAggTemplate implements HashAggregator {
}
else if (putStatus == HashTable.PutStatus.KEY_ADDED) {
- if (EXTRA_DEBUG_2) logger.debug("Group-by key was added to hash table, inserting new aggregate values") ;
+ if (EXTRA_DEBUG_2) {
+ logger.debug("Group-by key was added to hash table, inserting new aggregate values") ;
+ }
// debugging
// if (holder.value == 100018 || holder.value == 100021) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 4277f23..238242b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -40,7 +40,7 @@ public interface HashAggregator {
public static enum AggOutcome {
RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR
- }
+ }
public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
OperatorStats stats, BufferAllocator allocator, RecordBatch incoming,
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index 3e6def1..e690060 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -34,8 +34,8 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
private final SelectionVector2 sv2;
private final SelectionVector4 sv4;
- public InternalBatch(RecordBatch incoming){
- switch(incoming.getSchema().getSelectionVectorMode()){
+ public InternalBatch(RecordBatch incoming) {
+ switch(incoming.getSchema().getSelectionVectorMode()) {
case FOUR_BYTE:
this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent();
this.sv2 = null;
@@ -69,13 +69,17 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
return container.iterator();
}
- public void clear(){
- if(sv2 != null) sv2.clear();
- if(sv4 != null) sv4.clear();
+ public void clear() {
+ if (sv2 != null) {
+ sv2.clear();
+ }
+ if (sv4 != null) {
+ sv4.clear();
+ }
container.clear();
}
- public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds){
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds) {
return container.getValueAccessorById(clazz, fieldIds);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 820f722..ced5179 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -67,8 +67,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
@Override
public int getRecordCount() {
- if(done) return 0;
- if (aggregator == null) return 0;
+ if (done) {
+ return 0;
+ }
+ if (aggregator == null) {
+ return 0;
+ }
return aggregator.getOutputCount();
}
@@ -88,7 +92,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
case STOP:
return outcome;
case OK_NEW_SCHEMA:
- if (!createAggregator()){
+ if (!createAggregator()) {
done = true;
return IterOutcome.STOP;
}
@@ -100,12 +104,14 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
}
- while(true){
+ while (true) {
AggOutcome out = aggregator.doWork();
logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
- switch(out){
+ switch (out) {
case CLEANUP_AND_RETURN:
- if (!first) container.zeroVectors();
+ if (!first) {
+ container.zeroVectors();
+ }
done = true;
// fall through
case RETURN_OUTCOME:
@@ -122,7 +128,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
case UPDATE_AGGREGATOR:
first = false;
aggregator = null;
- if(!createAggregator()){
+ if (!createAggregator()) {
return IterOutcome.STOP;
}
continue;
@@ -142,23 +148,20 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
*/
private boolean createAggregator() {
logger.debug("Creating new aggregator.");
- try{
+ try {
stats.startSetup();
this.aggregator = createAggregatorInternal();
return true;
- }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+ } catch (SchemaChangeException | ClassTransformationException | IOException ex) {
context.fail(ex);
container.clear();
incoming.kill(false);
return false;
- }finally{
+ } finally {
stats.stopSetup();
}
}
-
-
-
private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{
ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getFunctionRegistry());
container.clear();
@@ -169,20 +172,24 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
ErrorCollector collector = new ErrorCollectorImpl();
- for(int i =0; i < keyExprs.length; i++){
+ for (int i =0; i < keyExprs.length; i++) {
NamedExpression ne = popConfig.getKeys()[i];
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() );
- if(expr == null) continue;
+ if (expr == null) {
+ continue;
+ }
keyExprs[i] = expr;
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
keyOutputIds[i] = container.add(vector);
}
- for(int i =0; i < valueExprs.length; i++){
+ for (int i =0; i < valueExprs.length; i++) {
NamedExpression ne = popConfig.getExprs()[i];
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
- if(expr == null) continue;
+ if (expr == null) {
+ continue;
+ }
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -190,7 +197,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
valueExprs[i] = new ValueVectorWriteExpression(id, expr, true);
}
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
setupIsSame(cg, keyExprs);
setupIsSameApart(cg, keyExprs);
@@ -207,15 +216,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
return agg;
}
-
-
private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null);
private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME);
private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME);
- private void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs){
+ private void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) {
cg.setMappingSet(IS_SAME_I1);
- for(LogicalExpression expr : keyExprs){
+ for (LogicalExpression expr : keyExprs) {
// first, we rewrite the evaluation stack for each side of the comparison.
cg.setMappingSet(IS_SAME_I1);
HoldingContainer first = cg.addExpr(expr, false);
@@ -234,9 +241,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ);
private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV);
- private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs){
+ private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) {
cg.setMappingSet(ISA_B1);
- for(LogicalExpression expr : keyExprs){
+ for (LogicalExpression expr : keyExprs) {
// first, we rewrite the evaluation stack for each side of the comparison.
cg.setMappingSet(ISA_B1);
HoldingContainer first = cg.addExpr(expr, false);
@@ -254,9 +261,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup");
private final MappingSet EVAL = new MappingSet("index", "outIndex", "incoming", "outgoing", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
- private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs){
+ private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs) {
cg.setMappingSet(EVAL);
- for(LogicalExpression ex : valueExprs){
+ for (LogicalExpression ex : valueExprs) {
HoldingContainer hc = cg.addExpr(ex);
cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
@@ -265,9 +272,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private final MappingSet RECORD_KEYS = new MappingSet(GeneratorMapping.create("setupInterior", "outputRecordKeys", null, null));
- private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){
+ private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) {
cg.setMappingSet(RECORD_KEYS);
- for(int i =0; i < keyExprs.length; i++){
+ for (int i =0; i < keyExprs.length; i++) {
HoldingContainer hc = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], keyExprs[i], true));
cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
@@ -280,10 +287,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private final GeneratorMapping PREVIOUS_KEYS = GeneratorMapping.create("outputRecordKeysPrev", "outputRecordKeysPrev", null, null);
private final MappingSet RECORD_KEYS_PREV = new MappingSet("previousIndex", "outIndex", "previous", null, PREVIOUS_KEYS, PREVIOUS_KEYS);
- private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){
+ private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) {
cg.setMappingSet(RECORD_KEYS_PREV);
- for(int i =0; i < keyExprs.length; i++){
+ for (int i =0; i < keyExprs.length; i++) {
// IMPORTANT: there is an implicit assertion here that the TypedFieldIds for the previous batch and the current batch are the same. This is possible because InternalBatch guarantees this.
logger.debug("Writing out expr {}", keyExprs[i]);
cg.rotateBlock();
@@ -297,8 +304,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
}
- private void getIndex(ClassGenerator<StreamingAggregator> g){
- switch(incoming.getSchema().getSelectionVectorMode()){
+ private void getIndex(ClassGenerator<StreamingAggregator> g) {
+ switch (incoming.getSchema().getSelectionVectorMode()) {
case FOUR_BYTE: {
JVar var = g.declareClassField("sv4_", g.getModel()._ref(SelectionVector4.class));
g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4"));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 53ac1ed..c2a5715 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -60,7 +60,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
private void allocateOutgoing() {
- for(VectorWrapper<?> w : outgoing){
+ for (VectorWrapper<?> w : outgoing) {
w.getValueVector().allocateNew();
}
}
@@ -75,7 +75,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return outputCount;
}
- private AggOutcome tooBigFailure(){
+ private AggOutcome tooBigFailure() {
context.fail(new Exception(TOO_BIG_ERROR));
this.outcome = IterOutcome.STOP;
return AggOutcome.CLEANUP_AND_RETURN;
@@ -87,11 +87,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
outcome = IterOutcome.NONE;
return AggOutcome.CLEANUP_AND_RETURN;
}
- try{ // outside loop to ensure that first is set to false after the first run.
+ try { // outside loop to ensure that first is set to false after the first run.
outputCount = 0;
// if we're in the first state, allocate outgoing.
- if(first){
+ if (first) {
allocateOutgoing();
}
@@ -119,8 +119,10 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
}
// pick up a remainder batch if we have one.
- if(remainderBatch != null){
- if (!outputToBatch( previousIndex )) return tooBigFailure();
+ if (remainderBatch != null) {
+ if (!outputToBatch( previousIndex )) {
+ return tooBigFailure();
+ }
remainderBatch.clear();
remainderBatch = null;
return setOkAndReturn();
@@ -131,38 +133,56 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
if (pendingOutput) {
allocateOutgoing();
pendingOutput = false;
- if(EXTRA_DEBUG) logger.debug("Attempting to output remainder.");
- if (!outputToBatch( previousIndex)) return tooBigFailure();
+ if (EXTRA_DEBUG) {
+ logger.debug("Attempting to output remainder.");
+ }
+ if (!outputToBatch( previousIndex)) {
+ return tooBigFailure();
+ }
}
- if(newSchema){
+ if (newSchema) {
return AggOutcome.UPDATE_AGGREGATOR;
}
- if(lastOutcome != null){
+ if (lastOutcome != null) {
outcome = lastOutcome;
return AggOutcome.CLEANUP_AND_RETURN;
}
- outside: while(true){
+ outside: while(true) {
// loop through existing records, adding as necessary.
for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
- if(EXTRA_DEBUG) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+ if (EXTRA_DEBUG) {
+ logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+ }
if (previousIndex == -1) {
- if (EXTRA_DEBUG) logger.debug("Adding the initial row's keys and values.");
+ if (EXTRA_DEBUG) {
+ logger.debug("Adding the initial row's keys and values.");
+ }
addRecordInc(currentIndex);
}
else if (isSame( previousIndex, currentIndex )) {
- if(EXTRA_DEBUG) logger.debug("Values were found the same, adding.");
+ if (EXTRA_DEBUG) {
+ logger.debug("Values were found the same, adding.");
+ }
addRecordInc(currentIndex);
} else {
- if(EXTRA_DEBUG) logger.debug("Values were different, outputting previous batch.");
+ if (EXTRA_DEBUG) {
+ logger.debug("Values were different, outputting previous batch.");
+ }
if (outputToBatch(previousIndex)) {
- if(EXTRA_DEBUG) logger.debug("Output successful.");
+ if (EXTRA_DEBUG) {
+ logger.debug("Output successful.");
+ }
addRecordInc(currentIndex);
} else {
- if(EXTRA_DEBUG) logger.debug("Output failed.");
- if(outputCount == 0) return tooBigFailure();
+ if (EXTRA_DEBUG) {
+ logger.debug("Output failed.");
+ }
+ if (outputCount == 0) {
+ return tooBigFailure();
+ }
// mark the pending output but move forward for the next cycle.
pendingOutput = true;
@@ -178,23 +198,29 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
InternalBatch previous = null;
- try{
- while(true){
+ try {
+ while (true) {
if (previous != null) {
previous.clear();
}
previous = new InternalBatch(incoming);
IterOutcome out = outgoing.next(0, incoming);
- if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
- switch(out){
+ if (EXTRA_DEBUG) {
+ logger.debug("Received IterOutcome of {}", out);
+ }
+ switch (out) {
case NONE:
done = true;
lastOutcome = out;
if (first && addedRecordCount == 0) {
return setOkAndReturn();
- } else if(addedRecordCount > 0){
- if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous;
- if(EXTRA_DEBUG) logger.debug("Received no more batches, returning.");
+ } else if(addedRecordCount > 0) {
+ if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
+ remainderBatch = previous;
+ }
+ if (EXTRA_DEBUG) {
+ logger.debug("Received no more batches, returning.");
+ }
return setOkAndReturn();
}else{
if (first && out == IterOutcome.OK) {
@@ -204,17 +230,21 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return AggOutcome.CLEANUP_AND_RETURN;
}
-
-
case NOT_YET:
this.outcome = out;
return AggOutcome.RETURN_OUTCOME;
case OK_NEW_SCHEMA:
- if(EXTRA_DEBUG) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
- if(addedRecordCount > 0){
- if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous;
- if(EXTRA_DEBUG) logger.debug("Wrote out end of previous batch, returning.");
+ if (EXTRA_DEBUG) {
+ logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
+ }
+ if (addedRecordCount > 0) {
+ if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
+ remainderBatch = previous;
+ }
+ if (EXTRA_DEBUG) {
+ logger.debug("Wrote out end of previous batch, returning.");
+ }
newSchema = true;
return setOkAndReturn();
}
@@ -222,21 +252,27 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return AggOutcome.UPDATE_AGGREGATOR;
case OK:
resetIndex();
- if(incoming.getRecordCount() == 0){
+ if (incoming.getRecordCount() == 0) {
continue;
- }else{
- if(previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)){
- if(EXTRA_DEBUG) logger.debug("New value was same as last value of previous batch, adding.");
+ } else {
+ if (previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)) {
+ if (EXTRA_DEBUG) {
+ logger.debug("New value was same as last value of previous batch, adding.");
+ }
addRecordInc(currentIndex);
previousIndex = currentIndex;
incIndex();
- if(EXTRA_DEBUG) logger.debug("Continuing outside");
+ if (EXTRA_DEBUG) {
+ logger.debug("Continuing outside");
+ }
continue outside;
- }else{ // not the same
- if(EXTRA_DEBUG) logger.debug("This is not the same as the previous, add record and continue outside.");
+ } else { // not the same
+ if (EXTRA_DEBUG) {
+ logger.debug("This is not the same as the previous, add record and continue outside.");
+ }
previousIndex = currentIndex;
- if(addedRecordCount > 0){
- if( !outputToBatchPrev( previous, previousIndex, outputCount) ){
+ if (addedRecordCount > 0) {
+ if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
remainderBatch = previous;
return setOkAndReturn();
}
@@ -251,72 +287,78 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return AggOutcome.CLEANUP_AND_RETURN;
}
-
}
- }finally{
+ } finally {
// make sure to clear previous if we haven't saved it.
- if(remainderBatch == null && previous != null){
+ if (remainderBatch == null && previous != null) {
previous.clear();
}
}
}
- }finally{
- if(first) first = !first;
+ } finally {
+ if (first) {
+ first = !first;
+ }
}
}
-
- private final void incIndex(){
+ private final void incIndex() {
underlyingIndex++;
- if(underlyingIndex >= incoming.getRecordCount()){
+ if (underlyingIndex >= incoming.getRecordCount()) {
currentIndex = Integer.MAX_VALUE;
return;
}
currentIndex = getVectorIndex(underlyingIndex);
}
- private final void resetIndex(){
+ private final void resetIndex() {
underlyingIndex = -1;
incIndex();
}
- private final AggOutcome setOkAndReturn(){
- if(first){
+ private final AggOutcome setOkAndReturn() {
+ if (first) {
this.outcome = IterOutcome.OK_NEW_SCHEMA;
- }else{
+ } else {
this.outcome = IterOutcome.OK;
}
- for(VectorWrapper<?> v : outgoing){
+ for (VectorWrapper<?> v : outgoing) {
v.getValueVector().getMutator().setValueCount(outputCount);
}
return AggOutcome.RETURN_OUTCOME;
}
- private final boolean outputToBatch(int inIndex){
+ private final boolean outputToBatch(int inIndex) {
- if(!outputRecordKeys(inIndex, outputCount)){
- if(EXTRA_DEBUG) logger.debug("Failure while outputting keys {}", outputCount);
+ if (!outputRecordKeys(inIndex, outputCount)) {
+ if(EXTRA_DEBUG) {
+ logger.debug("Failure while outputting keys {}", outputCount);
+ }
return false;
}
- if(!outputRecordValues(outputCount)){
- if(EXTRA_DEBUG) logger.debug("Failure while outputting values {}", outputCount);
+ if (!outputRecordValues(outputCount)) {
+ if (EXTRA_DEBUG) {
+ logger.debug("Failure while outputting values {}", outputCount);
+ }
return false;
}
- if(EXTRA_DEBUG) logger.debug("{} values output successfully", outputCount);
+ if (EXTRA_DEBUG) {
+ logger.debug("{} values output successfully", outputCount);
+ }
resetValues();
outputCount++;
addedRecordCount = 0;
return true;
}
- private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex){
+ private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
boolean success = outputRecordKeysPrev(b1, inIndex, outIndex) //
&& outputRecordValues(outIndex) //
&& resetValues();
- if(success){
+ if (success) {
resetValues();
outputCount++;
addedRecordCount = 0;
@@ -325,17 +367,18 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return success;
}
- private void addRecordInc(int index){
+ private void addRecordInc(int index) {
addRecord(index);
this.addedRecordCount++;
}
@Override
- public void cleanup(){
- if(remainderBatch != null) remainderBatch.clear();
+ public void cleanup() {
+ if (remainderBatch != null) {
+ remainderBatch.clear();
+ }
}
-
public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 8f5f29b..96da00b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -28,8 +28,8 @@ public interface StreamingAggregator {
public static TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class);
public static enum AggOutcome {
- RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
- }
+ RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
+ }
public abstract void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 195d249..f77407e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -53,23 +53,23 @@ public class ChainedHashTable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ChainedHashTable.class);
private static final GeneratorMapping KEY_MATCH_BUILD =
- GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */,
+ GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping KEY_MATCH_PROBE =
- GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */,
+ GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping GET_HASH_BUILD =
- GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */,
+ GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping GET_HASH_PROBE =
- GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */,
+ GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping SET_VALUE =
- GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */,
+ GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping OUTPUT_KEYS =
@@ -138,8 +138,12 @@ public class ChainedHashTable {
int i = 0;
for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
- if (expr == null) continue;
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
+ if (expr == null) {
+ continue;
+ }
keyExprsBuild[i] = expr;
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
@@ -155,8 +159,12 @@ public class ChainedHashTable {
i = 0;
for (NamedExpression ne : htConfig.getKeyExprsProbe()) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
- if (expr == null) continue;
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
+ if (expr == null) {
+ continue;
+ }
keyExprsProbe[i] = expr;
i++;
}
@@ -293,4 +301,3 @@ public class ChainedHashTable {
}
}
}
-
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index b03880c..6024523 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -164,10 +164,11 @@ public abstract class HashTableTemplate implements HashTable {
assert (currentIdxWithinBatch < HashTable.BATCH_SIZE);
assert (incomingRowIdx < HashTable.BATCH_SIZE);
- if (isProbe)
+ if (isProbe) {
match = isKeyMatchInternalProbe(incomingRowIdx, currentIdxWithinBatch);
- else
+ } else {
match = isKeyMatchInternalBuild(incomingRowIdx, currentIdxWithinBatch);
+ }
if (! match) {
currentIdxHolder.value = links.getAccessor().get(currentIdxWithinBatch);
@@ -196,7 +197,9 @@ public abstract class HashTableTemplate implements HashTable {
maxOccupiedIdx = Math.max(maxOccupiedIdx, currentIdxWithinBatch);
- if (EXTRA_DEBUG) logger.debug("BatchHolder: inserted key at incomingRowIdx = {}, currentIdx = {}, hash value = {}.", incomingRowIdx, currentIdx, hashValue);
+ if (EXTRA_DEBUG) {
+ logger.debug("BatchHolder: inserted key at incomingRowIdx = {}, currentIdx = {}, hash value = {}.", incomingRowIdx, currentIdx, hashValue);
+ }
return true;
}
@@ -225,7 +228,9 @@ public abstract class HashTableTemplate implements HashTable {
newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT);
newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
- if (EXTRA_DEBUG) logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+ if (EXTRA_DEBUG) {
+ logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+ }
} else {
// follow the new table's hash chain until we encounter empty slot. Note that the hash chain could
@@ -245,7 +250,9 @@ public abstract class HashTableTemplate implements HashTable {
newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT);
newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
- if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+ if (EXTRA_DEBUG) {
+ logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+ }
break;
} else if (bh != this && bh.links.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) {
@@ -253,7 +260,9 @@ public abstract class HashTableTemplate implements HashTable {
newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); // update the newLink entry in this batch to mark end of the hash chain
newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
- if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+ if (EXTRA_DEBUG) {
+ logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+ }
break;
}
@@ -381,11 +390,19 @@ public abstract class HashTableTemplate implements HashTable {
float loadf = htConfig.getLoadFactor();
int initialCap = htConfig.getInitialCapacity();
- if (loadf <= 0 || Float.isNaN(loadf)) throw new IllegalArgumentException("Load factor must be a valid number greater than 0");
- if (initialCap <= 0) throw new IllegalArgumentException("The initial capacity must be greater than 0");
- if (initialCap > MAXIMUM_CAPACITY) throw new IllegalArgumentException("The initial capacity must be less than maximum capacity allowed");
+ if (loadf <= 0 || Float.isNaN(loadf)) {
+ throw new IllegalArgumentException("Load factor must be a valid number greater than 0");
+ }
+ if (initialCap <= 0) {
+ throw new IllegalArgumentException("The initial capacity must be greater than 0");
+ }
+ if (initialCap > MAXIMUM_CAPACITY) {
+ throw new IllegalArgumentException("The initial capacity must be less than maximum capacity allowed");
+ }
- if (htConfig.getKeyExprsBuild() == null || htConfig.getKeyExprsBuild().length == 0) throw new IllegalArgumentException("Hash table must have at least 1 key expression");
+ if (htConfig.getKeyExprsBuild() == null || htConfig.getKeyExprsBuild().length == 0) {
+ throw new IllegalArgumentException("Hash table must have at least 1 key expression");
+ }
this.htConfig = htConfig;
this.context = context;
@@ -397,8 +414,9 @@ public abstract class HashTableTemplate implements HashTable {
// round up the initial capacity to nearest highest power of 2
tableSize = roundUpToPowerOf2(initialCap);
- if (tableSize > MAXIMUM_CAPACITY)
+ if (tableSize > MAXIMUM_CAPACITY) {
tableSize = MAXIMUM_CAPACITY;
+ }
threshold = (int) Math.ceil(tableSize * loadf);
@@ -500,7 +518,9 @@ public abstract class HashTableTemplate implements HashTable {
currentIdx = freeIndex++;
addBatchIfNeeded(currentIdx);
- if (EXTRA_DEBUG) logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, incomingRowIdx, currentIdx);
+ if (EXTRA_DEBUG) {
+ logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, incomingRowIdx, currentIdx);
+ }
if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) {
// update the start index array
@@ -543,14 +563,16 @@ public abstract class HashTableTemplate implements HashTable {
currentIdx = freeIndex++;
addBatchIfNeeded(currentIdx);
- if (EXTRA_DEBUG) logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
+ if (EXTRA_DEBUG) {
+ logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
+ }
if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) {
htIdxHolder.value = currentIdx;
return PutStatus.KEY_ADDED;
- }
- else
+ } else {
return PutStatus.PUT_FAILED;
+ }
}
return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED ;
@@ -618,7 +640,9 @@ public abstract class HashTableTemplate implements HashTable {
if (currentIdx >= totalBatchSize) {
BatchHolder bh = addBatchHolder();
- if (EXTRA_DEBUG) logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
+ if (EXTRA_DEBUG) {
+ logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
+ }
return bh;
}
else {
@@ -638,12 +662,15 @@ public abstract class HashTableTemplate implements HashTable {
// in the new table.. the metadata consists of the startIndices, links and hashValues.
// Note that the keys stored in the BatchHolders are not moved around.
private void resizeAndRehashIfNeeded() {
- if (numEntries < threshold)
+ if (numEntries < threshold) {
return;
+ }
long t0 = System.currentTimeMillis();
- if (EXTRA_DEBUG) logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold);
+ if (EXTRA_DEBUG) {
+ logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold);
+ }
// If the table size is already MAXIMUM_CAPACITY, don't resize
// the table, but set the threshold to Integer.MAX_VALUE such that
@@ -656,8 +683,9 @@ public abstract class HashTableTemplate implements HashTable {
int newSize = 2 * tableSize;
tableSize = roundUpToPowerOf2(newSize);
- if (tableSize > MAXIMUM_CAPACITY)
+ if (tableSize > MAXIMUM_CAPACITY) {
tableSize = MAXIMUM_CAPACITY;
+ }
// set the new threshold based on the new table size and load factor
threshold = (int) Math.ceil(tableSize * htConfig.getLoadFactor());
@@ -717,5 +745,3 @@ public abstract class HashTableTemplate implements HashTable {
protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx) ;
}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index bf00194..f1fcce0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -79,7 +79,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
protected void doWork() {
int recordCount = incoming.getRecordCount();
filter.filterBatch(recordCount);
-// for(VectorWrapper<?> v : container){
+// for (VectorWrapper<?> v : container) {
// ValueVector.Mutator m = v.getValueVector().getMutator();
// m.setValueCount(recordCount);
// }
@@ -88,8 +88,12 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
@Override
public void cleanup() {
- if(sv2 != null) sv2.clear();
- if(sv4 != null) sv4.clear();
+ if (sv2 != null) {
+ sv2.clear();
+ }
+ if (sv4 != null) {
+ sv4.clear();
+ }
super.cleanup();
}
@@ -100,7 +104,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
sv2.clear();
}
- switch(incoming.getSchema().getSelectionVectorMode()){
+ switch (incoming.getSchema().getSelectionVectorMode()) {
case NONE:
sv2 = new SelectionVector2(oContext.getAllocator());
this.filter = generateSV2Filterer();
@@ -137,13 +141,13 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION4, context.getFunctionRegistry());
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry());
- if(collector.hasErrors()){
+ if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
cg.addExpr(new ReturnValueExpression(expr));
-// for(VectorWrapper<?> i : incoming){
+// for (VectorWrapper<?> i : incoming) {
// ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
// container.add(v);
// allocators.add(getAllocator4(v));
@@ -177,13 +181,13 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry());
- if(collector.hasErrors()){
+ if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
cg.addExpr(new ReturnValueExpression(expr));
- for(VectorWrapper<?> v : incoming){
+ for (VectorWrapper<?> v : incoming) {
TransferPair pair = v.getValueVector().getTransferPair();
container.add(pair.getTo());
transfers.add(pair);
@@ -202,5 +206,4 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 15044b8..2a08c05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -457,8 +457,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
return hj;
}
- private void allocateVectors(){
- for(VectorWrapper<?> v : container){
+ private void allocateVectors() {
+ for(VectorWrapper<?> v : container) {
v.getValueVector().allocateNew();
}
}
@@ -472,7 +472,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
private void updateStats(HashTable htable) {
- if(htable == null) return;
+ if (htable == null) {
+ return;
+ }
htable.getStats(htStats);
this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
@@ -488,7 +490,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
@Override
public void cleanup() {
- if(hjHelper != null){
+ if (hjHelper != null) {
hjHelper.clear();
}
@@ -504,4 +506,5 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
left.cleanup();
right.cleanup();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 785deae..133289e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -94,11 +94,13 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
boolean success = true;
while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) {
success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords);
- if(success){
+ if (success) {
recordsProcessed++;
outputRecords++;
- }else{
- if(outputRecords == 0) throw new IllegalStateException("Too big to fail.");
+ } else {
+ if (outputRecords == 0) {
+ throw new IllegalStateException("Too big to fail.");
+ }
break;
}
}
@@ -166,11 +168,11 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) //
&& projectProbeRecord(recordsProcessed, outputRecords);
- if(!success){
+ if (!success) {
// we failed to project. redo this record.
getNextRecord = false;
return;
- }else{
+ } else {
outputRecords++;
/* Projected single row from the build side with matching key but there
@@ -182,8 +184,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
* from the probe side. Drain the next row in the probe side.
*/
recordsProcessed++;
- }
- else {
+ } else {
/* There is more than one row with the same key on the build side
* don't drain more records from the probe side till we have projected
* all the rows with this key
@@ -197,10 +198,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
// If we have a left outer join, project the keys
if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
boolean success = projectProbeRecord(recordsProcessed, outputRecords);
- if(!success){
- if(outputRecords == 0){
+ if (!success) {
+ if (outputRecords == 0) {
throw new IllegalStateException("Record larger than single batch.");
- }else{
+ } else {
// we've output some records but failed to output this one. return and wait for next call.
return;
}
@@ -214,10 +215,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
hjHelper.setRecordMatched(currentCompositeIdx);
boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) //
&& projectProbeRecord(recordsProcessed, outputRecords);
- if(!success){
- if(outputRecords == 0){
+ if (!success) {
+ if (outputRecords == 0) {
throw new IllegalStateException("Record larger than single batch.");
- }else{
+ } else {
// we've output some records but failed to output this one. return and wait for next call.
return;
}
@@ -264,5 +265,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
@Named("outgoing") RecordBatch outgoing);
public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
+
public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
+
}