You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:48 UTC
[10/27] Initial Parquet commit. Suports INT, LONG, FLOAT, DOUBLE,
distributed scheduling.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index bad3a03..a40031e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -54,6 +54,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){
this.incoming = batch;
+ assert(incoming != null);
this.handle = context.getHandle();
this.recMajor = config.getOppositeMajorFragmentId();
this.tunnel = context.getCommunicator().getTunnel(config.getDestination());
@@ -74,12 +75,14 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
case NONE:
FragmentWritableBatch b2 = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
tunnel.sendRecordBatch(new RecordSendFailure(), context, b2);
+ b2.release();
return false;
case OK_NEW_SCHEMA:
case OK:
FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
tunnel.sendRecordBatch(new RecordSendFailure(), context, batch);
+ batch.release();
return true;
case NOT_YET:
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index 1148c93..b04e154 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -36,6 +36,8 @@ import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StorageEngineRegistry;
public class PhysicalPlanReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
@@ -45,7 +47,8 @@ public class PhysicalPlanReader {
private final ObjectReader operatorReader;
private final ObjectReader logicalPlanReader;
- public PhysicalPlanReader(DrillConfig config, ObjectMapper mapper, final DrillbitEndpoint endpoint) {
+ public PhysicalPlanReader(DrillConfig config, ObjectMapper mapper, final DrillbitEndpoint endpoint,
+ final StorageEngineRegistry engineRegistry) {
// Endpoint serializer/deserializer.
SimpleModule deserModule = new SimpleModule("PhysicalOperatorModule") //
@@ -58,6 +61,7 @@ public class PhysicalPlanReader {
mapper.registerModule(deserModule);
mapper.registerSubtypes(PhysicalOperatorUtil.getSubTypes(config));
InjectableValues injectables = new InjectableValues.Std() //
+ .addValue(StorageEngineRegistry.class, engineRegistry) //
.addValue(DrillbitEndpoint.class, endpoint); //
this.mapper = mapper;
@@ -66,6 +70,14 @@ public class PhysicalPlanReader {
this.logicalPlanReader = mapper.reader(LogicalPlan.class).with(injectables);
}
+ // TODO - we do not want to storage engine registry generated here in production, this was created to keep old
+ // tests passing, this constructor should be removed and the tests should be updated to use the contstructor
+ // that takes a storage engine registry
+ @Deprecated
+ public PhysicalPlanReader(DrillConfig config, ObjectMapper mapper, final DrillbitEndpoint endpoint) {
+ this(config, mapper, endpoint, null);
+ }
+
public String writeJson(PhysicalOperator op) throws JsonProcessingException{
return mapper.writeValueAsString(op);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
index 4188435..d3c8cee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.SubScan;
/**
* Responsible for breaking a plan into its constituent Fragments.
@@ -42,7 +43,13 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag
exchange.getChild().accept(this, next);
return value;
}
-
+
+ @Override
+ public Fragment visitSubScan(SubScan subScan, Fragment value) throws FragmentSetupException {
+ // TODO - implement this
+ return super.visitOp(subScan, value);
+ }
+
@Override
public Fragment visitOp(PhysicalOperator op, Fragment value) throws FragmentSetupException{
// logger.debug("Visiting Other {}", op);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index da71271..ab91d76 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -22,11 +22,8 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
-import org.apache.drill.exec.physical.base.Exchange;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Scan;
-import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.GroupScan;
import com.google.common.collect.Lists;
@@ -53,8 +50,14 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
}
@Override
- public PhysicalOperator visitScan(Scan<?> scan, IndexedFragmentNode iNode) throws ExecutionSetupException {
- return scan.getSpecificScan(iNode.getMinorFragmentId());
+ public PhysicalOperator visitGroupScan(GroupScan groupScan, IndexedFragmentNode iNode) throws ExecutionSetupException {
+ return groupScan.getSpecificScan(iNode.getMinorFragmentId());
+ }
+
+ @Override
+ public PhysicalOperator visitSubScan(SubScan subScan, IndexedFragmentNode value) throws ExecutionSetupException {
+ // TODO - implement this
+ return super.visitOp(subScan, value);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index e3bcff0..2ccb17c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -127,9 +127,11 @@ public class SimpleParallelizer {
.build();
if (isRootNode) {
+ logger.debug("Root fragment {}", fragment);
rootFragment = fragment;
rootOperator = root;
} else {
+ logger.debug("Remote fragment {}", fragment);
fragments.add(fragment);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index af8ec04..2ef5295 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -17,11 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.planner.fragment;
-import org.apache.drill.exec.physical.base.Exchange;
-import org.apache.drill.exec.physical.base.HasAffinity;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Scan;
-import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
@@ -75,10 +72,16 @@ public class StatsCollector {
}
@Override
- public Void visitScan(Scan<?> scan, Wrapper wrapper) {
+ public Void visitGroupScan(GroupScan groupScan, Wrapper wrapper) {
Stats stats = wrapper.getStats();
- stats.addMaxWidth(scan.getReadEntries().size());
- return super.visitScan(scan, wrapper);
+ stats.addMaxWidth(groupScan.getMaxParallelizationWidth());
+ return super.visitGroupScan(groupScan, wrapper);
+ }
+
+ @Override
+ public Void visitSubScan(SubScan subScan, Wrapper value) throws RuntimeException {
+ // TODO - implement this
+ return super.visitOp(subScan, value);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 0dfcb62..d5a24b0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -25,12 +25,10 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang.NotImplementedException;
import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
-import org.apache.drill.exec.physical.base.Exchange;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Scan;
-import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -113,9 +111,15 @@ public class Wrapper {
}
@Override
- public Void visitScan(Scan<?> scan, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
- scan.applyAssignments(value);
- return super.visitScan(scan, value);
+ public Void visitGroupScan(GroupScan groupScan, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
+ groupScan.applyAssignments(value);
+ return super.visitGroupScan(groupScan, value);
+ }
+
+ @Override
+ public Void visitSubScan(SubScan subScan, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
+ // TODO - implement this
+ return visitOp(subScan, value);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index c19065d..964ef5c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -55,5 +55,11 @@ public class FragmentWritableBatch{
return header;
}
+ public void release(){
+ for(ByteBuf b : buffers){
+ b.release();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 593c28c..4d47404 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -41,9 +41,9 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
private VectorContainer container = new VectorContainer();
private final BufferAllocator allocator;
- private int recordCount;
+ private int valueCount;
private BatchSchema schema;
-
+
public RecordBatchLoader(BufferAllocator allocator) {
super();
this.allocator = allocator;
@@ -51,18 +51,18 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
/**
* Load a record batch from a single buffer.
- *
+ *
* @param def
* The definition for the record batch.
* @param buf
* The buffer that holds the data associated with the record batch
* @return Whether or not the schema changed since the previous load.
- * @throws SchemaChangeException
+ * @throws SchemaChangeException
*/
public boolean load(RecordBatchDef def, ByteBuf buf) throws SchemaChangeException {
// logger.debug("Loading record batch with def {} and data {}", def, buf);
- this.recordCount = def.getRecordCount();
- boolean schemaChanged = false;
+ this.valueCount = def.getRecordCount();
+ boolean schemaChanged = schema == null;
Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
for(VectorWrapper<?> w : container){
@@ -73,7 +73,7 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
VectorContainer newVectors = new VectorContainer();
List<FieldMetadata> fields = def.getFieldList();
-
+
int bufOffset = 0;
for (FieldMetadata fmd : fields) {
FieldDef fieldDef = fmd.getDef();
@@ -82,23 +82,27 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
container.add(v);
continue;
}
-
+
// if we arrive here, we didn't have a matching vector.
schemaChanged = true;
MaterializedField m = new MaterializedField(fieldDef);
v = TypeHelper.getNewVector(m, allocator);
- v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+ if (fmd.getValueCount() == 0){
+ v.clear();
+ } else {
+ v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+ }
bufOffset += fmd.getBufferLength();
newVectors.add(v);
}
-
+
if(!oldFields.isEmpty()){
schemaChanged = true;
for(ValueVector v : oldFields.values()){
v.close();
}
}
-
+
// rebuild the schema.
SchemaBuilder b = BatchSchema.newBuilder();
for(VectorWrapper<?> v : newVectors){
@@ -132,7 +136,7 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
// }
public int getRecordCount() {
- return recordCount;
+ return valueCount;
}
public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
@@ -140,7 +144,7 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
}
public WritableBatch getWritableBatch(){
- return WritableBatch.getBatchNoSVWrap(recordCount, container);
+ return WritableBatch.getBatchNoSVWrap(valueCount, container);
}
@Override
@@ -152,6 +156,6 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
return schema;
}
-
-
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index e84bf37..cac042b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -71,7 +71,6 @@ public class WritableBatch {
List<ByteBuf> buffers = Lists.newArrayList();
List<FieldMetadata> metadata = Lists.newArrayList();
-
for (ValueVector vv : vectors) {
metadata.add(vv.getMetadata());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
index 6c0158a..8c8b6b9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
@@ -28,6 +29,7 @@ import java.util.List;
import org.apache.drill.exec.proto.GeneralRPCProtos.CompleteRpcMessage;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;
+import com.google.common.base.Preconditions;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.WireFormat;
@@ -103,10 +105,16 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{
cos.writeRawVarint32(rawBodyLength);
cos.flush(); // need to flush so that dbody goes after if cos is caching.
- out.add(buf);
+ CompositeByteBuf cbb = new CompositeByteBuf(buf.alloc(), true, msg.dBodies.length + 1);
+ cbb.addComponent(buf);
+ int bufLength = buf.readableBytes();
for(ByteBuf b : msg.dBodies){
- out.add(b);
+ cbb.addComponent(b);
+ bufLength += b.readableBytes();
}
+ cbb.writerIndex(bufLength);
+ out.add(cbb);
+
}else{
cos.flush();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index c933594..eb160be 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.exception.SetupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -35,6 +36,7 @@ import org.apache.drill.exec.store.StorageEngine;
import com.google.common.base.Preconditions;
import com.yammer.metrics.MetricRegistry;
+import org.apache.drill.exec.store.StorageEngineRegistry;
public class DrillbitContext {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
@@ -46,6 +48,7 @@ public class DrillbitContext {
private final BitCom com;
private final DistributedCache cache;
private final DrillbitEndpoint endpoint;
+ private final StorageEngineRegistry storageEngineRegistry;
public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, BitCom com, DistributedCache cache) {
super();
@@ -58,7 +61,8 @@ public class DrillbitContext {
this.com = com;
this.cache = cache;
this.endpoint = endpoint;
- this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint);
+ this.storageEngineRegistry = new StorageEngineRegistry(this);
+ this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storageEngineRegistry);
}
public DrillbitEndpoint getEndpoint(){
@@ -77,8 +81,8 @@ public class DrillbitContext {
return context.getAllocator();
}
- public StorageEngine getStorageEngine(StorageEngineConfig config){
- throw new UnsupportedOperationException();
+ public StorageEngine getStorageEngine(StorageEngineConfig config) throws SetupException {
+ return storageEngineRegistry.getEngine(config);
}
public NioEventLoopGroup getBitLoopGroup(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index b07f274..bb1f2e1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -44,6 +44,7 @@ public class ServiceEngine implements Closeable{
private final UserServer userServer;
private final BitCom bitCom;
private final DrillConfig config;
+ boolean useIP = false;
public ServiceEngine(BitComHandler bitComWorker, UserWorker userWorker, BootStrapContext context){
this.userServer = new UserServer(context.getAllocator().getUnderlyingAllocator(), new NioEventLoopGroup(1, new NamedThreadFactory("UserServer-")), userWorker);
@@ -53,8 +54,10 @@ public class ServiceEngine implements Closeable{
public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{
int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT));
+ String address = useIP ? InetAddress.getLocalHost().getHostAddress() : InetAddress.getLocalHost().getCanonicalHostName();
DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder()
- .setAddress(InetAddress.getLocalHost().getHostAddress())
+ .setAddress(address)
+ //.setAddress("localhost")
.setUserPort(userPort)
.build();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
index 80704fa..9c48052 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
@@ -24,6 +24,8 @@ import java.util.List;
import org.apache.drill.common.logical.data.Scan;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.collect.ListMultimap;
@@ -48,7 +50,7 @@ public class AbstractStorageEngine implements StorageEngine{
}
@Override
- public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
+ public AbstractGroupScan getPhysicalScan(Scan scan) throws IOException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
new file mode 100644
index 0000000..b4092cc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
@@ -0,0 +1,112 @@
+package org.apache.drill.exec.store;
+
+
+import com.google.common.collect.ImmutableRangeMap;
+import com.google.common.collect.Range;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.*;
+
+public class AffinityCalculator {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
+
+
+ BlockLocation[] blocks;
+ ImmutableRangeMap<Long,BlockLocation> blockMap;
+ FileSystem fs;
+ String fileName;
+ Collection<DrillbitEndpoint> endpoints;
+ HashMap<String,DrillbitEndpoint> endPointMap;
+
+ public AffinityCalculator(String fileName, FileSystem fs, Collection<DrillbitEndpoint> endpoints) {
+ this.fs = fs;
+ this.fileName = fileName;
+ this.endpoints = endpoints;
+ buildBlockMap();
+ buildEndpointMap();
+ }
+
+ private void buildBlockMap() {
+ try {
+ FileStatus file = fs.getFileStatus(new Path(fileName));
+ long tC = System.nanoTime();
+ blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
+ long tD = System.nanoTime();
+ logger.debug("Block locations: {}", blocks);
+ logger.debug("Took {} ms to get Block locations", (float)(tD - tC) / 1e6);
+ } catch (IOException ioe) { throw new RuntimeException(ioe); }
+ long tA = System.nanoTime();
+ ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new ImmutableRangeMap.Builder<Long,BlockLocation>();
+ for (BlockLocation block : blocks) {
+ long start = block.getOffset();
+ long end = start + block.getLength();
+ Range<Long> range = Range.closedOpen(start, end);
+ blockMapBuilder = blockMapBuilder.put(range, block);
+ }
+ blockMap = blockMapBuilder.build();
+ long tB = System.nanoTime();
+ logger.debug("Took {} ms to build block map", (float)(tB - tA) / 1e6);
+ }
+ /**
+ *
+ * @param entry
+ */
+ public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) {
+ long tA = System.nanoTime();
+ HashMap<String,Long> hostMap = new HashMap<>();
+ long start = entry.getStart();
+ long end = start + entry.getLength();
+ Range<Long> entryRange = Range.closedOpen(start, end);
+ ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(entryRange);
+ for (Map.Entry<Range<Long>,BlockLocation> e : subRangeMap.asMapOfRanges().entrySet()) {
+ String[] hosts = null;
+ Range<Long> blockRange = e.getKey();
+ try {
+ hosts = e.getValue().getHosts();
+ } catch (IOException ioe) { /*TODO Handle this exception */}
+ Range<Long> intersection = entryRange.intersection(blockRange);
+ long bytes = intersection.upperEndpoint() - intersection.lowerEndpoint();
+ for (String host : hosts) {
+ if (hostMap.containsKey(host)) {
+ hostMap.put(host, hostMap.get(host) + bytes);
+ } else {
+ hostMap.put(host, bytes);
+ }
+ }
+ }
+ HashMap<DrillbitEndpoint,Long> ebs = new HashMap();
+ try {
+ for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) {
+ String host = hostEntry.getKey();
+ Long bytes = hostEntry.getValue();
+ DrillbitEndpoint d = getDrillBitEndpoint(host);
+ if (d != null ) ebs.put(d, bytes);
+ }
+ } catch (NullPointerException n) {}
+ entry.setEndpointBytes(ebs);
+ long tB = System.nanoTime();
+ logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) / 1e6);
+ }
+
+ private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
+ return endPointMap.get(hostName);
+ }
+
+ private void buildEndpointMap() {
+ long tA = System.nanoTime();
+ endPointMap = new HashMap<String, DrillbitEndpoint>();
+ for (DrillbitEndpoint d : endpoints) {
+ String hostName = d.getAddress();
+ endPointMap.put(hostName, d);
+ }
+ long tB = System.nanoTime();
+ logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) / 1e6);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
index 4884b7a..b24521d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
@@ -23,6 +23,8 @@ import java.util.List;
import org.apache.drill.common.logical.data.Scan;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.collect.ListMultimap;
@@ -40,15 +42,15 @@ public interface StorageEngine {
public List<QueryOptimizerRule> getOptimizerRules();
/**
- * Get the set of read entries required for a particular Scan (read) node. This is somewhat analogous to traditional
- * MapReduce. The difference is, this is the most granular split paradigm.
+ * Get the physical scan operator populated with a set of read entries required for the particular GroupScan (read) node.
+ * This is somewhat analogous to traditional MapReduce. The difference is, this is the most granular split paradigm.
*
* @param scan
* The configured scan entries.
* @return
* @throws IOException
*/
- public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException;
+ public AbstractGroupScan getPhysicalScan(Scan scan) throws IOException;
/**
* Get the set of Drillbit endpoints that are available for each read entry. Note that it is possible for a read entry
@@ -104,10 +106,6 @@ public interface StorageEngine {
*/
public RecordRecorder getWriter(FragmentContext context, WriteEntry writeEntry) throws IOException;
- public interface ReadEntry {
- public Cost getCostEstimate();
- }
-
public interface WriteEntry {
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
index eef878e..26504a2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
@@ -38,19 +38,19 @@ public class StorageEngineRegistry {
private DrillbitContext context;
public StorageEngineRegistry(DrillbitContext context){
+ init(context.getConfig());
this.context = context;
- setup(context.getConfig());
}
@SuppressWarnings("unchecked")
- public void setup(DrillConfig config){
+ public void init(DrillConfig config){
Collection<Class<? extends StorageEngine>> engines = PathScanner.scanForImplementations(StorageEngine.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
logger.debug("Loading storage engines {}", engines);
for(Class<? extends StorageEngine> engine: engines){
int i =0;
for(Constructor<?> c : engine.getConstructors()){
Class<?>[] params = c.getParameterTypes();
- if(params.length != 2 || params[1] == DrillbitContext.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
+ if(params.length != 2 || params[1] != DrillbitContext.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
logger.debug("Skipping StorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillbitContext)]", c, engine);
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
index d2ad72a..7cbea57 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
@@ -27,10 +27,11 @@ public class VectorHolder {
private ValueVector vector;
private int currentLength;
- VectorHolder(ValueVector vector) {
- this.length = vector.getValueCapacity();
- this.vector = vector;
- }
+ public VectorHolder(int length, ValueVector vector) {
+ this.length = length;
+ this.vector = vector;
+ this.mutator = vector.getMutator();
+ }
public ValueVector getValueVector() {
return vector;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
new file mode 100644
index 0000000..c85d4aa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
@@ -0,0 +1,87 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+public final class BitReader extends ColumnReader {
+
+ byte currentByte;
+ byte nextByte;
+
+ BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v) {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+ }
+
+ @Override
+ protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
+
+ recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
+ - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+ readStartInBytes = pageReadStatus.readPosInBytes;
+ readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+ readLength = (int) Math.ceil(readLengthInBits / 8.0);
+
+ bytes = pageReadStatus.pageDataByteArray;
+ // standard read, using memory mapping
+ if (pageReadStatus.bitShift == 0) {
+ ((BaseDataValueVector) valueVecHolder.getValueVector()).getData().writeBytes(bytes,
+ (int) readStartInBytes, (int) readLength);
+ } else { // read in individual values, because a bitshift is necessary with where the last page or batch ended
+
+ vectorData = ((BaseDataValueVector) valueVecHolder.getValueVector()).getData();
+ nextByte = bytes[(int) Math.max(0, Math.ceil(pageReadStatus.valuesRead / 8.0) - 1)];
+ readLengthInBits = recordsReadInThisIteration + pageReadStatus.bitShift;
+ //recordsReadInThisIteration -= (8 - pageReadStatus.bitShift);
+
+ int i = 0;
+ // read individual bytes with appropriate shifting
+ for (; i <= (int) readLength; i++) {
+ currentByte = nextByte;
+ currentByte = (byte) (currentByte >>> pageReadStatus.bitShift);
+ // mask the bits about to be added from the next byte
+ currentByte = (byte) (currentByte & ParquetRecordReader.startBitMasks[pageReadStatus.bitShift - 1]);
+ // if we are not on the last byte
+ if ((int) Math.ceil(pageReadStatus.valuesRead / 8.0) + i < pageReadStatus.byteLength) {
+ // grab the next byte from the buffer, shift and mask it, and OR it with the leftover bits
+ nextByte = bytes[(int) Math.ceil(pageReadStatus.valuesRead / 8.0) + i];
+ currentByte = (byte) (currentByte | nextByte
+ << (8 - pageReadStatus.bitShift)
+ & ParquetRecordReader.endBitMasks[8 - pageReadStatus.bitShift - 1]);
+ }
+ vectorData.setByte(valuesReadInCurrentPass / 8 + i, currentByte);
+ }
+ vectorData.setIndex(0, (valuesReadInCurrentPass / 8)
+ + (int) readLength - 1);
+ vectorData.capacity(vectorData.writerIndex() + 1);
+ }
+
+ // check if the values in this page did not end on a byte boundary, store a number of bits the next page must be
+ // shifted by to read all of the values into the vector without leaving space
+ if (readLengthInBits % 8 != 0) {
+ pageReadStatus.bitShift = (int) readLengthInBits % 8;
+ } else {
+ pageReadStatus.bitShift = 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
new file mode 100644
index 0000000..8b4f760
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
@@ -0,0 +1,115 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.store.VectorHolder;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+
+public abstract class ColumnReader {
+ // Value Vector for this column
+ VectorHolder valueVecHolder;
+ // column description from the parquet library
+ ColumnDescriptor columnDescriptor;
+ // metadata of the column, from the parquet library
+ ColumnChunkMetaData columnChunkMetaData;
+ // status information on the current page
+ PageReadStatus pageReadStatus;
+
+ long readPositionInBuffer;
+
+ int compressedSize;
+
+ // quick reference to see if the field is fixed length (as this requires an instanceof)
+ boolean isFixedLength;
+ // counter for the total number of values read from one or more pages
+ // when a batch is filled all of these values should be the same for each column
+ int totalValuesRead;
+ // counter for the values that have been read in this pass (a single call to the next() method)
+ int valuesReadInCurrentPass;
+ // length of single data value in bits, if the length is fixed
+ int dataTypeLengthInBits;
+ int bytesReadInCurrentPass;
+ ParquetRecordReader parentReader;
+
+ ByteBuf vectorData;
+
+ // variables for a single read pass
+ long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
+ byte[] bytes;
+
+ ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v){
+ this.parentReader = parentReader;
+ if (allocateSize > 1) valueVecHolder = new VectorHolder(allocateSize, (BaseDataValueVector) v);
+ else valueVecHolder = new VectorHolder(5000, (BaseDataValueVector) v);
+
+ columnDescriptor = descriptor;
+ this.columnChunkMetaData = columnChunkMetaData;
+ isFixedLength = fixedLength;
+
+ pageReadStatus = new PageReadStatus(this, parentReader.getRowGroupIndex(), parentReader.getBufferWithAllData());
+
+ if (parentReader.getRowGroupIndex() != 0) readPositionInBuffer = columnChunkMetaData.getFirstDataPageOffset() - 4;
+ else readPositionInBuffer = columnChunkMetaData.getFirstDataPageOffset();
+
+ if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+ dataTypeLengthInBits = ParquetRecordReader.getTypeLengthInBits(columnDescriptor.getType());
+ }
+ }
+
+ public void readAllFixedFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
+ readStartInBytes = 0;
+ readLength = 0;
+ readLengthInBits = 0;
+ recordsReadInThisIteration = 0;
+ vectorData = ((BaseDataValueVector) valueVecHolder.getValueVector()).getData();
+ do {
+ // if no page has been read, or all of the records have been read out of a page, read the next one
+ if (pageReadStatus.currentPage == null
+ || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
+ totalValuesRead += pageReadStatus.valuesRead;
+ if (!pageReadStatus.next()) {
+ break;
+ }
+ }
+
+ readField( recordsToReadInThisPass, firstColumnStatus);
+
+ valuesReadInCurrentPass += recordsReadInThisIteration;
+ totalValuesRead += recordsReadInThisIteration;
+ pageReadStatus.valuesRead += recordsReadInThisIteration;
+ if (readStartInBytes + readLength >= pageReadStatus.byteLength) {
+ pageReadStatus.next();
+ } else {
+ pageReadStatus.readPosInBytes = readStartInBytes + readLength;
+ }
+ }
+ while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
+ ((BaseDataValueVector) valueVecHolder.getValueVector()).getMutator().setValueCount(
+ valuesReadInCurrentPass);
+ }
+
+ protected abstract void readField(long recordsToRead, ColumnReader firstColumnStatus);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
new file mode 100644
index 0000000..355405b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
@@ -0,0 +1,48 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+public class FixedByteAlignedReader extends ColumnReader {
+
+ FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v) {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
+
+ recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
+ - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+ readStartInBytes = pageReadStatus.readPosInBytes;
+ readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+ readLength = (int) Math.ceil(readLengthInBits / 8.0);
+
+ bytes = pageReadStatus.pageDataByteArray;
+ // vectorData is assigned by the superclass read loop method
+ vectorData.writeBytes(bytes,
+ (int) readStartInBytes, (int) readLength);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
new file mode 100644
index 0000000..29d9cc7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import parquet.bytes.BytesInput;
+import parquet.column.page.Page;
+import parquet.column.page.PageReader;
+import parquet.format.PageHeader;
+
+import java.io.IOException;
+
+import static parquet.format.Util.readPageHeader;
+
+// class to keep track of the read position of variable length columns
+public final class PageReadStatus {
+
+ ColumnReader parentColumnReader;
+
+ // store references to the pages that have been uncompressed, but not copied to ValueVectors yet
+ Page currentPage;
+ // buffer to store bytes of current page, set to max size of parquet page
+ byte[] pageDataByteArray = new byte[ParquetRecordReader.PARQUET_PAGE_MAX_SIZE];
+ PageReader pageReader;
+ // read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData
+ long readPosInBytes;
+ // bit shift needed for the next page if the last one did not line up with a byte boundary
+ int bitShift;
+ // storage space for extra bits at the end of a page if they did not line up with a byte boundary
+ // prevents the need to keep the entire last page, as these pageDataByteArray need to be added to the next batch
+ //byte extraBits;
+ // the number of values read out of the last page
+ int valuesRead;
+ int byteLength;
+ int rowGroupIndex;
+ // buffer with the compressed data of an entire row group
+ ByteBuf bufferWithAllData;
+
+ PageReadStatus(ColumnReader parentStatus, int rowGroupIndex, ByteBuf bufferWithAllData){
+ this.parentColumnReader = parentStatus;
+ this.rowGroupIndex = rowGroupIndex;
+ }
+
+ /**
+ * Grab the next page.
+ *
+ * @return - if another page was present
+ * @throws java.io.IOException
+ */
+ public boolean next() throws IOException {
+
+ int shift = 0;
+ if (rowGroupIndex == 0) shift = 0;
+ else shift = 4;
+ // first ROW GROUP has a different endpoint, because there are for bytes at the beginning of the file "PAR1"
+ if (parentColumnReader.readPositionInBuffer + shift == parentColumnReader.columnChunkMetaData.getFirstDataPageOffset() + parentColumnReader.columnChunkMetaData.getTotalSize()){
+ return false;
+ }
+ // TODO - in the JIRA for parquet steven put a stack trace for an error with a row group with 3 values in it
+ // the Math.min with the end of the buffer should fix it but now I'm not getting results back, leaving it here for now
+ // because it is needed, but there might be a problem with it
+ ByteBufInputStream f = new ByteBufInputStream(parentColumnReader.parentReader.getBufferWithAllData().slice(
+ (int) parentColumnReader.readPositionInBuffer,
+ Math.min(50, parentColumnReader.parentReader.getBufferWithAllData().capacity() - (int) parentColumnReader.readPositionInBuffer)));
+ int before = f.available();
+ PageHeader pageHeader = readPageHeader(f);
+ int length = before - f.available();
+ f = new ByteBufInputStream(parentColumnReader.parentReader.getBufferWithAllData().slice(
+ (int) parentColumnReader.readPositionInBuffer + length, pageHeader.getCompressed_page_size()));
+
+ BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
+ .decompress(BytesInput.from(f, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size(),
+ parentColumnReader.columnChunkMetaData.getCodec());
+ currentPage = new Page(
+ bytesIn,
+ pageHeader.data_page_header.num_values,
+ pageHeader.uncompressed_page_size,
+ ParquetStorageEngine.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
+ ParquetStorageEngine.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
+ ParquetStorageEngine.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
+ );
+
+ parentColumnReader.readPositionInBuffer += pageHeader.compressed_page_size + length;
+ byteLength = pageHeader.uncompressed_page_size;
+ if (currentPage == null) {
+ return false;
+ }
+
+ // if the buffer holding each page's data is not large enough to hold the current page, re-allocate, with a little extra space
+ if (pageHeader.getUncompressed_page_size() > pageDataByteArray.length) {
+ pageDataByteArray = new byte[pageHeader.getUncompressed_page_size() + 100];
+ }
+ // TODO - would like to get this into the mainline, hopefully before alpha
+ currentPage.getBytes().toByteArray(pageDataByteArray, 0, byteLength);
+
+ readPosInBytes = 0;
+ valuesRead = 0;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
new file mode 100644
index 0000000..f4988a0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -0,0 +1,357 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.exception.SetupException;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntryFromHDFS;
+import org.apache.drill.exec.physical.ReadEntryWithPath;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.drill.exec.store.AffinityCalculator;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.org.codehaus.jackson.annotate.JsonCreator;
+
+
+@JsonTypeName("parquet-scan")
+public class ParquetGroupScan extends AbstractGroupScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
+
+ private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings;
+ private List<RowGroupInfo> rowGroupInfos;
+
+ public List<ReadEntryWithPath> getEntries() {
+ return entries;
+ }
+
+ public ParquetStorageEngineConfig getEngineConfig() {
+ return this.engineConfig;
+ }
+
+ private List<ReadEntryWithPath> entries;
+ @JsonIgnore
+ private long totalBytes;
+ private Collection<DrillbitEndpoint> availableEndpoints;
+ private ParquetStorageEngine storageEngine;
+ private StorageEngineRegistry engineRegistry;
+ private ParquetStorageEngineConfig engineConfig;
+ private FileSystem fs;
+ private String fileName;
+ private List<EndpointAffinity> endpointAffinities;
+
+ @JsonCreator
+ public ParquetGroupScan(@JsonProperty("entries") List<ReadEntryWithPath> entries,
+ @JsonProperty("storageengine") ParquetStorageEngineConfig storageEngineConfig,
+ @JacksonInject StorageEngineRegistry engineRegistry
+ )throws SetupException,IOException {
+ engineRegistry.init(DrillConfig.create());
+ this.storageEngine = (ParquetStorageEngine) engineRegistry.getEngine(storageEngineConfig);
+ this.availableEndpoints = storageEngine.getContext().getBits();
+ this.fs = storageEngine.getFileSystem();
+ this.engineConfig = storageEngineConfig;
+ this.engineRegistry = engineRegistry;
+ this.entries = entries;
+ readFooter();
+ this.fileName = rowGroupInfos.get(0).getPath();
+ calculateEndpointBytes();
+ }
+
+ public ParquetGroupScan(ArrayList<ReadEntryWithPath> entries,
+ ParquetStorageEngine storageEngine) throws IOException {
+ this.storageEngine = storageEngine;
+ this.availableEndpoints = storageEngine.getContext().getBits();
+ this.fs = storageEngine.getFileSystem();
+ this.entries = entries;
+ readFooter();
+ this.fileName = rowGroupInfos.get(0).getPath();
+ calculateEndpointBytes();
+ }
+
+ private void readFooter() throws IOException {
+ long tA = System.nanoTime();
+ rowGroupInfos = new ArrayList();
+ long start = 0, length = 0;
+ ColumnChunkMetaData columnChunkMetaData;
+ for (ReadEntryWithPath readEntryWithPath : entries){
+ Path path = new Path(readEntryWithPath.getPath());
+
+ ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
+ readEntryWithPath.getPath();
+
+ int i = 0;
+ for (BlockMetaData rowGroup : footer.getBlocks()){
+ // need to grab block information from HDFS
+ columnChunkMetaData = rowGroup.getColumns().iterator().next();
+ start = columnChunkMetaData.getFirstDataPageOffset();
+ // this field is not being populated correctly, but the column chunks know their sizes, just summing them for now
+ //end = start + rowGroup.getTotalByteSize();
+ length = 0;
+ for (ColumnChunkMetaData col : rowGroup.getColumns()){
+ length += col.getTotalSize();
+ }
+ rowGroupInfos.add(new ParquetGroupScan.RowGroupInfo(readEntryWithPath.getPath(), start, length, i));
+ logger.debug("rowGroupInfo path: {} start: {} length {}", readEntryWithPath.getPath(), start, length);
+ i++;
+ }
+ }
+ long tB = System.nanoTime();
+ logger.debug("Took {} ms to get row group infos", (float)(tB - tA) / 1E6);
+ }
+
+ private void calculateEndpointBytes() {
+ long tA = System.nanoTime();
+ AffinityCalculator ac = new AffinityCalculator(fileName, fs, availableEndpoints);
+ for (RowGroupInfo e : rowGroupInfos) {
+ ac.setEndpointBytes(e);
+ totalBytes += e.getLength();
+ }
+ long tB = System.nanoTime();
+ logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB - tA) / 1E6);
+ }
+/*
+ public LinkedList<RowGroupInfo> getRowGroups() {
+ return rowGroups;
+ }
+
+ public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) {
+ this.rowGroups = rowGroups;
+ }
+
+ public static class ParquetFileReadEntry {
+
+ String path;
+
+ public ParquetFileReadEntry(@JsonProperty String path){
+ this.path = path;
+ }
+ }
+ */
+
+ @JsonIgnore
+ public FileSystem getFileSystem() {
+ return this.fs;
+ }
+
+ public static class RowGroupInfo extends ReadEntryFromHDFS {
+
+ private HashMap<DrillbitEndpoint,Long> endpointBytes;
+ private long maxBytes;
+ private int rowGroupIndex;
+
+ @JsonCreator
+ public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start,
+ @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
+ super(path, start, length);
+ this.rowGroupIndex = rowGroupIndex;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1, 2, 1, 1);
+ }
+
+ @Override
+ public Size getSize() {
+ // TODO - these values are wrong, I cannot know these until after I read a file
+ return new Size(10, 10);
+ }
+
+ public HashMap<DrillbitEndpoint,Long> getEndpointBytes() {
+ return endpointBytes;
+ }
+
+ public void setEndpointBytes(HashMap<DrillbitEndpoint,Long> endpointBytes) {
+ this.endpointBytes = endpointBytes;
+ }
+
+ public void setMaxBytes(long bytes) {
+ this.maxBytes = bytes;
+ }
+
+ public long getMaxBytes() {
+ return maxBytes;
+ }
+
+ public ParquetRowGroupScan.RowGroupReadEntry getRowGroupReadEntry() {
+ return new ParquetRowGroupScan.RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
+ }
+
+ public int getRowGroupIndex() {
+ return this.rowGroupIndex;
+ }
+ }
+
+ private class ParquetReadEntryComparator implements Comparator<RowGroupInfo> {
+ public int compare(RowGroupInfo e1, RowGroupInfo e2) {
+ if (e1.getMaxBytes() == e2.getMaxBytes()) return 0;
+ return (e1.getMaxBytes() > e2.getMaxBytes()) ? 1 : -1;
+ }
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ long tA = System.nanoTime();
+ if (this.endpointAffinities == null) {
+ HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
+ for (RowGroupInfo entry : rowGroupInfos) {
+ for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
+ long bytes = entry.getEndpointBytes().get(d);
+ float affinity = (float)bytes / (float)totalBytes;
+ logger.error("RowGroup: {} Endpoint: {} Bytes: {}", entry.getRowGroupIndex(), d.getAddress(), bytes);
+ if (affinities.keySet().contains(d)) {
+ affinities.put(d, affinities.get(d) + affinity);
+ } else {
+ affinities.put(d, affinity);
+ }
+ }
+ }
+ List<EndpointAffinity> affinityList = new LinkedList<>();
+ for (DrillbitEndpoint d : affinities.keySet()) {
+ logger.debug("Endpoint {} has affinity {}", d.getAddress(), affinities.get(d).floatValue());
+ affinityList.add(new EndpointAffinity(d,affinities.get(d).floatValue()));
+ }
+ this.endpointAffinities = affinityList;
+ }
+ long tB = System.nanoTime();
+ logger.debug("Took {} ms to get operator affinity", (float)(tB - tA) / 1E6);
+ return this.endpointAffinities;
+ }
+
+
+
+
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+ long tA = System.nanoTime();
+ Preconditions.checkArgument(endpoints.size() <= rowGroupInfos.size());
+
+ int i = 0;
+ for (DrillbitEndpoint endpoint : endpoints) {
+ logger.debug("Endpoint index {}, endpoint host: {}", i++, endpoint.getAddress());
+ }
+
+ Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
+ mappings = new LinkedList[endpoints.size()];
+ LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints, rowGroupInfos, 100, true, false);
+ LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints, unassigned, 50, true, false);
+ LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints, unassigned2, 25, true, false);
+ LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints, unassigned3, 0, false, false);
+ LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints, unassigned4, 0, false, true);
+ assert unassigned5.size() == 0 : String.format("All readEntries should be assigned by now, but some are still unassigned");
+ long tB = System.nanoTime();
+ logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) / 1E6);
+ }
+
+ private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean mustContain, boolean assignAll) {
+ Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
+ LinkedList<RowGroupInfo> unassigned = new LinkedList<>();
+
+ int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() * 1.5);
+
+ if (maxEntries < 1) maxEntries = 1;
+
+ int i =0;
+ for(RowGroupInfo e : rowGroups) {
+ boolean assigned = false;
+ for (int j = i; j < i + endpoints.size(); j++) {
+ DrillbitEndpoint currentEndpoint = endpoints.get(j%endpoints.size());
+ if (assignAll ||
+ (e.getEndpointBytes().size() > 0 &&
+ (e.getEndpointBytes().containsKey(currentEndpoint) || !mustContain) &&
+ (mappings[j%endpoints.size()] == null || mappings[j%endpoints.size()].size() < maxEntries) &&
+ e.getEndpointBytes().get(currentEndpoint) >= e.getMaxBytes() * requiredPercentage / 100)) {
+ LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries = mappings[j%endpoints.size()];
+ if(entries == null){
+ entries = new LinkedList<ParquetRowGroupScan.RowGroupReadEntry>();
+ mappings[j%endpoints.size()] = entries;
+ }
+ entries.add(e.getRowGroupReadEntry());
+ logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}", e.getPath(), e.getStart(), currentEndpoint.getAddress());
+ assigned = true;
+ break;
+ }
+ }
+ if (!assigned) unassigned.add(e);
+ i++;
+ }
+ return unassigned;
+ }
+
+ @Override
+ public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
+ assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
+ for (ParquetRowGroupScan.RowGroupReadEntry rg : mappings[minorFragmentId]) {
+ logger.debug("minorFragmentId: {} Path: {} RowGroupIndex: {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
+ }
+ try {
+ return new ParquetRowGroupScan(storageEngine, engineConfig, mappings[minorFragmentId]);
+ } catch (SetupException e) {
+ e.printStackTrace(); // TODO - fix this
+ }
+ return null;
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return rowGroupInfos.size();
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1,1,1,1);
+ }
+
+ @Override
+ public Size getSize() {
+ // TODO - this is wrong, need to populate correctly
+ return new Size(10,10);
+ }
+
+ @Override
+ @JsonIgnore
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ //TODO return copy of self
+ return this;
+ }
+
+}