You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:48 UTC

[10/27] Initial Parquet commit. Suports INT, LONG, FLOAT, DOUBLE, distributed scheduling.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index bad3a03..a40031e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -54,6 +54,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     
     public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){
       this.incoming = batch;
+      assert(incoming != null);
       this.handle = context.getHandle();
       this.recMajor = config.getOppositeMajorFragmentId();
       this.tunnel = context.getCommunicator().getTunnel(config.getDestination());
@@ -74,12 +75,14 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       case NONE:
         FragmentWritableBatch b2 = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
         tunnel.sendRecordBatch(new RecordSendFailure(), context, b2);
+        b2.release();
         return false;
 
       case OK_NEW_SCHEMA:
       case OK:
         FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
         tunnel.sendRecordBatch(new RecordSendFailure(), context, batch);
+        batch.release();
         return true;
 
       case NOT_YET:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index 1148c93..b04e154 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -36,6 +36,8 @@ import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StorageEngineRegistry;
 
 public class PhysicalPlanReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
@@ -45,7 +47,8 @@ public class PhysicalPlanReader {
   private final ObjectReader operatorReader;
   private final ObjectReader logicalPlanReader;
 
-  public PhysicalPlanReader(DrillConfig config, ObjectMapper mapper, final DrillbitEndpoint endpoint) {
+  public PhysicalPlanReader(DrillConfig config, ObjectMapper mapper, final DrillbitEndpoint endpoint,
+                            final StorageEngineRegistry engineRegistry) {
 
     // Endpoint serializer/deserializer.
     SimpleModule deserModule = new SimpleModule("PhysicalOperatorModule") //
@@ -58,6 +61,7 @@ public class PhysicalPlanReader {
     mapper.registerModule(deserModule);
     mapper.registerSubtypes(PhysicalOperatorUtil.getSubTypes(config));
     InjectableValues injectables = new InjectableValues.Std() //
+            .addValue(StorageEngineRegistry.class, engineRegistry) //
         .addValue(DrillbitEndpoint.class, endpoint); //
 
     this.mapper = mapper;
@@ -66,6 +70,14 @@ public class PhysicalPlanReader {
     this.logicalPlanReader = mapper.reader(LogicalPlan.class).with(injectables);
   }
 
+  // TODO - we do not want to storage engine registry generated here in production, this was created to keep old
+  // tests passing, this constructor should be removed and the tests should be updated to use the contstructor
+  // that takes a storage engine registry
+  @Deprecated
+  public PhysicalPlanReader(DrillConfig config, ObjectMapper mapper, final DrillbitEndpoint endpoint) {
+    this(config, mapper, endpoint, null);
+  }
+
   public String writeJson(PhysicalOperator op) throws JsonProcessingException{
     return mapper.writeValueAsString(op);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
index 4188435..d3c8cee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.Exchange;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.SubScan;
 
 /**
  * Responsible for breaking a plan into its constituent Fragments.
@@ -42,7 +43,13 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag
     exchange.getChild().accept(this, next);
     return value;
   }
-  
+
+  @Override
+  public Fragment visitSubScan(SubScan subScan, Fragment value) throws FragmentSetupException {
+    // TODO - implement this
+    return super.visitOp(subScan, value);
+  }
+
   @Override
   public Fragment visitOp(PhysicalOperator op, Fragment value)  throws FragmentSetupException{
 //    logger.debug("Visiting Other {}", op);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index da71271..ab91d76 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -22,11 +22,8 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
 import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
-import org.apache.drill.exec.physical.base.Exchange;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Scan;
-import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.GroupScan;
 
 import com.google.common.collect.Lists;
 
@@ -53,8 +50,14 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
   }
 
   @Override
-  public PhysicalOperator visitScan(Scan<?> scan, IndexedFragmentNode iNode) throws ExecutionSetupException {
-    return scan.getSpecificScan(iNode.getMinorFragmentId());
+  public PhysicalOperator visitGroupScan(GroupScan groupScan, IndexedFragmentNode iNode) throws ExecutionSetupException {
+    return groupScan.getSpecificScan(iNode.getMinorFragmentId());
+  }
+
+  @Override
+  public PhysicalOperator visitSubScan(SubScan subScan, IndexedFragmentNode value) throws ExecutionSetupException {
+    // TODO - implement this
+    return super.visitOp(subScan, value);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index e3bcff0..2ccb17c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -127,9 +127,11 @@ public class SimpleParallelizer {
             .build();
 
         if (isRootNode) {
+          logger.debug("Root fragment {}", fragment);
           rootFragment = fragment;
           rootOperator = root;
         } else {
+          logger.debug("Remote fragment {}", fragment);
           fragments.add(fragment);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index af8ec04..2ef5295 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -17,11 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.planner.fragment;
 
-import org.apache.drill.exec.physical.base.Exchange;
-import org.apache.drill.exec.physical.base.HasAffinity;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Scan;
-import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
 import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 
@@ -75,10 +72,16 @@ public class StatsCollector {
     }
 
     @Override
-    public Void visitScan(Scan<?> scan, Wrapper wrapper) {
+    public Void visitGroupScan(GroupScan groupScan, Wrapper wrapper) {
       Stats stats = wrapper.getStats();      
-      stats.addMaxWidth(scan.getReadEntries().size());
-      return super.visitScan(scan, wrapper);
+      stats.addMaxWidth(groupScan.getMaxParallelizationWidth());
+      return super.visitGroupScan(groupScan, wrapper);
+    }
+
+    @Override
+    public Void visitSubScan(SubScan subScan, Wrapper value) throws RuntimeException {
+      // TODO - implement this
+      return super.visitOp(subScan, value);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 0dfcb62..d5a24b0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -25,12 +25,10 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
-import org.apache.drill.exec.physical.base.Exchange;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Scan;
-import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
@@ -113,9 +111,15 @@ public class Wrapper {
     }
 
     @Override
-    public Void visitScan(Scan<?> scan, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
-      scan.applyAssignments(value);
-      return super.visitScan(scan, value);
+    public Void visitGroupScan(GroupScan groupScan, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
+      groupScan.applyAssignments(value);
+      return super.visitGroupScan(groupScan, value);
+    }
+
+    @Override
+    public Void visitSubScan(SubScan subScan, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
+      // TODO - implement this
+      return visitOp(subScan, value);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index c19065d..964ef5c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -55,5 +55,11 @@ public class FragmentWritableBatch{
     return header;
   }
   
+  public void release(){
+    for(ByteBuf b : buffers){
+      b.release();
+    }
+  }
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 593c28c..4d47404 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -41,9 +41,9 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
 
   private VectorContainer container = new VectorContainer();
   private final BufferAllocator allocator;
-  private int recordCount; 
+  private int valueCount;
   private BatchSchema schema;
-  
+
   public RecordBatchLoader(BufferAllocator allocator) {
     super();
     this.allocator = allocator;
@@ -51,18 +51,18 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
 
   /**
    * Load a record batch from a single buffer.
-   * 
+   *
    * @param def
    *          The definition for the record batch.
    * @param buf
    *          The buffer that holds the data associated with the record batch
    * @return Whether or not the schema changed since the previous load.
-   * @throws SchemaChangeException 
+   * @throws SchemaChangeException
    */
   public boolean load(RecordBatchDef def, ByteBuf buf) throws SchemaChangeException {
 //    logger.debug("Loading record batch with def {} and data {}", def, buf);
-    this.recordCount = def.getRecordCount();
-    boolean schemaChanged = false;
+    this.valueCount = def.getRecordCount();
+    boolean schemaChanged = schema == null;
 
     Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
     for(VectorWrapper<?> w : container){
@@ -73,7 +73,7 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
     VectorContainer newVectors = new VectorContainer();
 
     List<FieldMetadata> fields = def.getFieldList();
-    
+
     int bufOffset = 0;
     for (FieldMetadata fmd : fields) {
       FieldDef fieldDef = fmd.getDef();
@@ -82,23 +82,27 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
         container.add(v);
         continue;
       }
-      
+
       // if we arrive here, we didn't have a matching vector.
       schemaChanged = true;
       MaterializedField m = new MaterializedField(fieldDef);
       v = TypeHelper.getNewVector(m, allocator);
-      v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+      if (fmd.getValueCount() == 0){
+        v.clear();
+      } else {
+        v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+      }
       bufOffset += fmd.getBufferLength();
       newVectors.add(v);
     }
-    
+
     if(!oldFields.isEmpty()){
       schemaChanged = true;
       for(ValueVector v : oldFields.values()){
         v.close();
       }
     }
-    
+
     // rebuild the schema.
     SchemaBuilder b = BatchSchema.newBuilder();
     for(VectorWrapper<?> v : newVectors){
@@ -132,7 +136,7 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
 //  }
 
   public int getRecordCount() {
-    return recordCount;
+    return valueCount;
   }
 
   public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
@@ -140,7 +144,7 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
   }
   
   public WritableBatch getWritableBatch(){
-    return WritableBatch.getBatchNoSVWrap(recordCount, container);
+    return WritableBatch.getBatchNoSVWrap(valueCount, container);
   }
 
   @Override
@@ -152,6 +156,6 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
     return schema;
   }
 
-  
-  
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index e84bf37..cac042b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -71,7 +71,6 @@ public class WritableBatch {
     List<ByteBuf> buffers = Lists.newArrayList();
     List<FieldMetadata> metadata = Lists.newArrayList();
 
-    
     for (ValueVector vv : vectors) {
       metadata.add(vv.getMetadata());
       

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
index 6c0158a..8c8b6b9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.CompositeByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageEncoder;
 
@@ -28,6 +29,7 @@ import java.util.List;
 import org.apache.drill.exec.proto.GeneralRPCProtos.CompleteRpcMessage;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.WireFormat;
 
@@ -103,10 +105,16 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{
         cos.writeRawVarint32(rawBodyLength);
         cos.flush(); // need to flush so that dbody goes after if cos is caching.
         
-        out.add(buf);
+        CompositeByteBuf cbb = new CompositeByteBuf(buf.alloc(), true, msg.dBodies.length + 1);
+        cbb.addComponent(buf);
+        int bufLength = buf.readableBytes();
         for(ByteBuf b : msg.dBodies){
-          out.add(b);
+          cbb.addComponent(b);
+          bufLength += b.readableBytes();
         }
+        cbb.writerIndex(bufLength);
+        out.add(cbb);
+        
         
       }else{
         cos.flush();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index c933594..eb160be 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.logical.StorageEngineConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.exception.SetupException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -35,6 +36,7 @@ import org.apache.drill.exec.store.StorageEngine;
 
 import com.google.common.base.Preconditions;
 import com.yammer.metrics.MetricRegistry;
+import org.apache.drill.exec.store.StorageEngineRegistry;
 
 public class DrillbitContext {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
@@ -46,6 +48,7 @@ public class DrillbitContext {
   private final BitCom com;
   private final DistributedCache cache;
   private final DrillbitEndpoint endpoint;
+  private final StorageEngineRegistry storageEngineRegistry;
   
   public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, BitCom com, DistributedCache cache) {
     super();
@@ -58,7 +61,8 @@ public class DrillbitContext {
     this.com = com;
     this.cache = cache;
     this.endpoint = endpoint;
-    this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint);
+    this.storageEngineRegistry = new StorageEngineRegistry(this);
+    this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storageEngineRegistry);
   }
   
   public DrillbitEndpoint getEndpoint(){
@@ -77,8 +81,8 @@ public class DrillbitContext {
     return context.getAllocator();
   }
   
-  public StorageEngine getStorageEngine(StorageEngineConfig config){
-    throw new UnsupportedOperationException();
+  public StorageEngine getStorageEngine(StorageEngineConfig config) throws SetupException {
+    return storageEngineRegistry.getEngine(config);
   }
   
   public NioEventLoopGroup getBitLoopGroup(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index b07f274..bb1f2e1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -44,6 +44,7 @@ public class ServiceEngine implements Closeable{
   private final UserServer userServer;
   private final BitCom bitCom;
   private final DrillConfig config;
+  boolean useIP = false;
   
   public ServiceEngine(BitComHandler bitComWorker, UserWorker userWorker, BootStrapContext context){
     this.userServer = new UserServer(context.getAllocator().getUnderlyingAllocator(), new NioEventLoopGroup(1, new NamedThreadFactory("UserServer-")), userWorker);
@@ -53,8 +54,10 @@ public class ServiceEngine implements Closeable{
   
   public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{
     int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT));
+    String address = useIP ?  InetAddress.getLocalHost().getHostAddress() : InetAddress.getLocalHost().getCanonicalHostName();
     DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder()
-        .setAddress(InetAddress.getLocalHost().getHostAddress())
+        .setAddress(address)
+        //.setAddress("localhost")
         .setUserPort(userPort)
         .build();
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
index 80704fa..9c48052 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
@@ -24,6 +24,8 @@ import java.util.List;
 
 import org.apache.drill.common.logical.data.Scan;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.collect.ListMultimap;
@@ -48,7 +50,7 @@ public class AbstractStorageEngine implements StorageEngine{
   }
 
   @Override
-  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
+  public AbstractGroupScan getPhysicalScan(Scan scan) throws IOException {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
new file mode 100644
index 0000000..b4092cc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
@@ -0,0 +1,112 @@
+package org.apache.drill.exec.store;
+
+
+import com.google.common.collect.ImmutableRangeMap;
+import com.google.common.collect.Range;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.*;
+
+public class AffinityCalculator {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
+
+
+  BlockLocation[] blocks;
+  ImmutableRangeMap<Long,BlockLocation> blockMap;
+  FileSystem fs;
+  String fileName;
+  Collection<DrillbitEndpoint> endpoints;
+  HashMap<String,DrillbitEndpoint> endPointMap;
+
+  public AffinityCalculator(String fileName, FileSystem fs, Collection<DrillbitEndpoint> endpoints) {
+    this.fs = fs;
+    this.fileName = fileName;
+    this.endpoints = endpoints;
+    buildBlockMap();
+    buildEndpointMap();
+  }
+
+  private void buildBlockMap() {
+    try {
+      FileStatus file = fs.getFileStatus(new Path(fileName));
+      long tC = System.nanoTime();
+      blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
+      long tD = System.nanoTime();
+      logger.debug("Block locations: {}", blocks);
+      logger.debug("Took {} ms to get Block locations", (float)(tD - tC) / 1e6);
+    } catch (IOException ioe) { throw new RuntimeException(ioe); }
+    long tA = System.nanoTime();
+    ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new ImmutableRangeMap.Builder<Long,BlockLocation>();
+    for (BlockLocation block : blocks) {
+      long start = block.getOffset();
+      long end = start + block.getLength();
+      Range<Long> range = Range.closedOpen(start, end);
+      blockMapBuilder = blockMapBuilder.put(range, block);
+    }
+    blockMap = blockMapBuilder.build();
+    long tB = System.nanoTime();
+    logger.debug("Took {} ms to build block map", (float)(tB - tA) / 1e6);
+  }
+  /**
+   *
+   * @param entry
+   */
+  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) {
+    long tA = System.nanoTime();
+    HashMap<String,Long> hostMap = new HashMap<>();
+    long start = entry.getStart();
+    long end = start + entry.getLength();
+    Range<Long> entryRange = Range.closedOpen(start, end);
+    ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(entryRange);
+    for (Map.Entry<Range<Long>,BlockLocation> e : subRangeMap.asMapOfRanges().entrySet()) {
+      String[] hosts = null;
+      Range<Long> blockRange = e.getKey();
+      try {
+        hosts = e.getValue().getHosts();
+      } catch (IOException ioe) { /*TODO Handle this exception */}
+      Range<Long> intersection = entryRange.intersection(blockRange);
+      long bytes = intersection.upperEndpoint() - intersection.lowerEndpoint();
+      for (String host : hosts) {
+        if (hostMap.containsKey(host)) {
+          hostMap.put(host, hostMap.get(host) + bytes);
+        } else {
+          hostMap.put(host, bytes);
+        }
+      }
+    }
+    HashMap<DrillbitEndpoint,Long> ebs = new HashMap();
+    try {
+      for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) {
+        String host = hostEntry.getKey();
+        Long bytes = hostEntry.getValue();
+        DrillbitEndpoint d = getDrillBitEndpoint(host);
+        if (d != null ) ebs.put(d, bytes);
+      }
+    } catch (NullPointerException n) {}
+    entry.setEndpointBytes(ebs);
+    long tB = System.nanoTime();
+    logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) / 1e6);
+  }
+
+  private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
+    return endPointMap.get(hostName);
+  }
+
+  private void buildEndpointMap() {
+    long tA = System.nanoTime();
+    endPointMap = new HashMap<String, DrillbitEndpoint>();
+    for (DrillbitEndpoint d : endpoints) {
+      String hostName = d.getAddress();
+      endPointMap.put(hostName, d);
+    }
+    long tB = System.nanoTime();
+    logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) / 1e6);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
index 4884b7a..b24521d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
@@ -23,6 +23,8 @@ import java.util.List;
 
 import org.apache.drill.common.logical.data.Scan;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.collect.ListMultimap;
@@ -40,15 +42,15 @@ public interface StorageEngine {
   public List<QueryOptimizerRule> getOptimizerRules();
 
   /**
-   * Get the set of read entries required for a particular Scan (read) node. This is somewhat analogous to traditional
-   * MapReduce. The difference is, this is the most granular split paradigm.
+   * Get the physical scan operator populated with a set of read entries required for the particular GroupScan (read) node.
+   * This is somewhat analogous to traditional MapReduce. The difference is, this is the most granular split paradigm.
    * 
    * @param scan
    *          The configured scan entries.
    * @return
    * @throws IOException
    */
-  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException;
+  public AbstractGroupScan getPhysicalScan(Scan scan) throws IOException;
 
   /**
    * Get the set of Drillbit endpoints that are available for each read entry. Note that it is possible for a read entry
@@ -104,10 +106,6 @@ public interface StorageEngine {
    */
   public RecordRecorder getWriter(FragmentContext context, WriteEntry writeEntry) throws IOException;
 
-  public interface ReadEntry {
-    public Cost getCostEstimate();
-  }
-
   public interface WriteEntry {
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
index eef878e..26504a2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
@@ -38,19 +38,19 @@ public class StorageEngineRegistry {
 
   private DrillbitContext context;
   public StorageEngineRegistry(DrillbitContext context){
+    init(context.getConfig());
     this.context = context;
-    setup(context.getConfig());
   }
   
   @SuppressWarnings("unchecked")
-  public void setup(DrillConfig config){
+  public void init(DrillConfig config){
     Collection<Class<? extends StorageEngine>> engines = PathScanner.scanForImplementations(StorageEngine.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
     logger.debug("Loading storage engines {}", engines);
     for(Class<? extends StorageEngine> engine: engines){
       int i =0;
       for(Constructor<?> c : engine.getConstructors()){
         Class<?>[] params = c.getParameterTypes();
-        if(params.length != 2 || params[1] == DrillbitContext.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
+        if(params.length != 2 || params[1] != DrillbitContext.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
           logger.debug("Skipping StorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillbitContext)]", c, engine);
           continue;
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
index d2ad72a..7cbea57 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
@@ -27,10 +27,11 @@ public class VectorHolder {
   private ValueVector vector;
   private int currentLength;
 
-  VectorHolder(ValueVector vector) {
-    this.length = vector.getValueCapacity();
-    this.vector = vector;
-  }
+    public VectorHolder(int length, ValueVector vector) {
+        this.length = length;
+        this.vector = vector;
+        this.mutator = vector.getMutator();
+    }
 
   public ValueVector getValueVector() {
     return vector;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
new file mode 100644
index 0000000..c85d4aa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
@@ -0,0 +1,87 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+public final class BitReader extends ColumnReader {
+
+  byte currentByte;
+  byte nextByte;
+
+  BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+            boolean fixedLength, ValueVector v) {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+  }
+
+  @Override
+  protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
+
+    recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
+        - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+    readStartInBytes = pageReadStatus.readPosInBytes;
+    readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+    readLength = (int) Math.ceil(readLengthInBits / 8.0);
+
+    bytes = pageReadStatus.pageDataByteArray;
+    // standard read, using memory mapping
+    if (pageReadStatus.bitShift == 0) {
+      ((BaseDataValueVector) valueVecHolder.getValueVector()).getData().writeBytes(bytes,
+          (int) readStartInBytes, (int) readLength);
+    } else { // read in individual values, because a bitshift is necessary with where the last page or batch ended
+
+      vectorData = ((BaseDataValueVector) valueVecHolder.getValueVector()).getData();
+      nextByte = bytes[(int) Math.max(0, Math.ceil(pageReadStatus.valuesRead / 8.0) - 1)];
+      readLengthInBits = recordsReadInThisIteration + pageReadStatus.bitShift;
+      //recordsReadInThisIteration -= (8 - pageReadStatus.bitShift);
+
+      int i = 0;
+      // read individual bytes with appropriate shifting
+      for (; i <= (int) readLength; i++) {
+        currentByte = nextByte;
+        currentByte = (byte) (currentByte >>> pageReadStatus.bitShift);
+        // mask the bits about to be added from the next byte
+        currentByte = (byte) (currentByte & ParquetRecordReader.startBitMasks[pageReadStatus.bitShift - 1]);
+        // if we are not on the last byte
+        if ((int) Math.ceil(pageReadStatus.valuesRead / 8.0) + i < pageReadStatus.byteLength) {
+          // grab the next byte from the buffer, shift and mask it, and OR it with the leftover bits
+          nextByte = bytes[(int) Math.ceil(pageReadStatus.valuesRead / 8.0) + i];
+          currentByte = (byte) (currentByte | nextByte
+              << (8 - pageReadStatus.bitShift)
+              & ParquetRecordReader.endBitMasks[8 - pageReadStatus.bitShift - 1]);
+        }
+        vectorData.setByte(valuesReadInCurrentPass / 8 + i, currentByte);
+      }
+      vectorData.setIndex(0, (valuesReadInCurrentPass / 8)
+          + (int) readLength - 1);
+      vectorData.capacity(vectorData.writerIndex() + 1);
+    }
+
+    // check if the values in this page did not end on a byte boundary, store a number of bits the next page must be
+    // shifted by to read all of the values into the vector without leaving space
+    if (readLengthInBits % 8 != 0) {
+      pageReadStatus.bitShift = (int) readLengthInBits % 8;
+    } else {
+      pageReadStatus.bitShift = 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
new file mode 100644
index 0000000..8b4f760
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
@@ -0,0 +1,115 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.store.VectorHolder;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+
+public abstract class ColumnReader {
+  // Value Vector for this column
+  VectorHolder valueVecHolder;
+  // column description from the parquet library
+  ColumnDescriptor columnDescriptor;
+  // metadata of the column, from the parquet library
+  ColumnChunkMetaData columnChunkMetaData;
+  // status information on the current page
+  PageReadStatus pageReadStatus;
+
+  long readPositionInBuffer;
+
+  int compressedSize;
+
+  // quick reference to see if the field is fixed length (as this requires an instanceof)
+  boolean isFixedLength;
+  // counter for the total number of values read from one or more pages
+  // when a batch is filled all of these values should be the same for each column
+  int totalValuesRead;
+  // counter for the values that have been read in this pass (a single call to the next() method)
+  int valuesReadInCurrentPass;
+  // length of single data value in bits, if the length is fixed
+  int dataTypeLengthInBits;
+  int bytesReadInCurrentPass;
+  ParquetRecordReader parentReader;
+
+  ByteBuf vectorData;
+
+  // variables for a single read pass
+  long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
+  byte[] bytes;
+
+  ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+               boolean fixedLength, ValueVector v){
+    this.parentReader = parentReader;
+    if (allocateSize > 1) valueVecHolder = new VectorHolder(allocateSize, (BaseDataValueVector) v);
+    else valueVecHolder = new VectorHolder(5000, (BaseDataValueVector) v);
+
+    columnDescriptor = descriptor;
+    this.columnChunkMetaData = columnChunkMetaData;
+    isFixedLength = fixedLength;
+
+    pageReadStatus = new PageReadStatus(this, parentReader.getRowGroupIndex(), parentReader.getBufferWithAllData());
+
+    if (parentReader.getRowGroupIndex() != 0) readPositionInBuffer = columnChunkMetaData.getFirstDataPageOffset() - 4;
+    else readPositionInBuffer = columnChunkMetaData.getFirstDataPageOffset();
+
+    if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+      dataTypeLengthInBits = ParquetRecordReader.getTypeLengthInBits(columnDescriptor.getType());
+    }
+  }
+
+  public void readAllFixedFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
+    readStartInBytes = 0;
+    readLength = 0;
+    readLengthInBits = 0;
+    recordsReadInThisIteration = 0;
+    vectorData = ((BaseDataValueVector) valueVecHolder.getValueVector()).getData();
+    do {
+      // if no page has been read, or all of the records have been read out of a page, read the next one
+      if (pageReadStatus.currentPage == null
+          || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
+        totalValuesRead += pageReadStatus.valuesRead;
+        if (!pageReadStatus.next()) {
+          break;
+        }
+      }
+
+      readField( recordsToReadInThisPass, firstColumnStatus);
+
+      valuesReadInCurrentPass += recordsReadInThisIteration;
+      totalValuesRead += recordsReadInThisIteration;
+      pageReadStatus.valuesRead += recordsReadInThisIteration;
+      if (readStartInBytes + readLength >= pageReadStatus.byteLength) {
+        pageReadStatus.next();
+      } else {
+        pageReadStatus.readPosInBytes = readStartInBytes + readLength;
+      }
+    }
+    while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
+    ((BaseDataValueVector) valueVecHolder.getValueVector()).getMutator().setValueCount(
+        valuesReadInCurrentPass);
+  }
+
+  protected abstract void readField(long recordsToRead, ColumnReader firstColumnStatus);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
new file mode 100644
index 0000000..355405b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
@@ -0,0 +1,48 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+public class FixedByteAlignedReader extends ColumnReader {
+
+  FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                         boolean fixedLength, ValueVector v) {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+  }
+
+  // this method is called by its superclass during a read loop
+  @Override
+  protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
+
+    recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
+        - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+    readStartInBytes = pageReadStatus.readPosInBytes;
+    readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+    readLength = (int) Math.ceil(readLengthInBits / 8.0);
+
+    bytes = pageReadStatus.pageDataByteArray;
+    // vectorData is assigned by the superclass read loop method
+    vectorData.writeBytes(bytes,
+        (int) readStartInBytes, (int) readLength);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
new file mode 100644
index 0000000..29d9cc7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import parquet.bytes.BytesInput;
+import parquet.column.page.Page;
+import parquet.column.page.PageReader;
+import parquet.format.PageHeader;
+
+import java.io.IOException;
+
+import static parquet.format.Util.readPageHeader;
+
+// class to keep track of the read position of variable length columns
+public final class PageReadStatus {
+
+  ColumnReader parentColumnReader;
+
+  // store references to the pages that have been uncompressed, but not copied to ValueVectors yet
+  Page currentPage;
+  // buffer to store bytes of current page, set to max size of parquet page
+  byte[] pageDataByteArray = new byte[ParquetRecordReader.PARQUET_PAGE_MAX_SIZE];
+  PageReader pageReader;
+  // read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData
+  long readPosInBytes;
+  // bit shift needed for the next page if the last one did not line up with a byte boundary
+  int bitShift;
+  // storage space for extra bits at the end of a page if they did not line up with a byte boundary
+  // prevents the need to keep the entire last page, as these pageDataByteArray need to be added to the next batch
+  //byte extraBits;
+  // the number of values read out of the last page
+  int valuesRead;
+  int byteLength;
+  int rowGroupIndex;
+  // buffer with the compressed data of an entire row group
+  ByteBuf bufferWithAllData;
+
+  PageReadStatus(ColumnReader parentStatus, int rowGroupIndex, ByteBuf bufferWithAllData){
+    this.parentColumnReader = parentStatus;
+    this.rowGroupIndex = rowGroupIndex;
+  }
+
+  /**
+   * Grab the next page.
+   *
+   * @return - if another page was present
+   * @throws java.io.IOException
+   */
+  public boolean next() throws IOException {
+
+    int shift = 0;
+    if (rowGroupIndex == 0) shift = 0;
+    else shift = 4;
+    // first ROW GROUP has a different endpoint, because there are for bytes at the beginning of the file "PAR1"
+    if (parentColumnReader.readPositionInBuffer + shift == parentColumnReader.columnChunkMetaData.getFirstDataPageOffset() + parentColumnReader.columnChunkMetaData.getTotalSize()){
+      return false;
+    }
+    // TODO - in the JIRA for parquet steven put a stack trace for an error with a row group with 3 values in it
+    // the Math.min with the end of the buffer should fix it but now I'm not getting results back, leaving it here for now
+    // because it is needed, but there might be a problem with it
+    ByteBufInputStream f = new ByteBufInputStream(parentColumnReader.parentReader.getBufferWithAllData().slice(
+        (int) parentColumnReader.readPositionInBuffer,
+        Math.min(50, parentColumnReader.parentReader.getBufferWithAllData().capacity() - (int) parentColumnReader.readPositionInBuffer)));
+    int before = f.available();
+    PageHeader pageHeader = readPageHeader(f);
+    int length = before - f.available();
+    f = new ByteBufInputStream(parentColumnReader.parentReader.getBufferWithAllData().slice(
+        (int) parentColumnReader.readPositionInBuffer + length, pageHeader.getCompressed_page_size()));
+
+    BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
+        .decompress(BytesInput.from(f, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size(),
+            parentColumnReader.columnChunkMetaData.getCodec());
+    currentPage = new Page(
+        bytesIn,
+        pageHeader.data_page_header.num_values,
+        pageHeader.uncompressed_page_size,
+        ParquetStorageEngine.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
+        ParquetStorageEngine.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
+        ParquetStorageEngine.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
+    );
+
+    parentColumnReader.readPositionInBuffer += pageHeader.compressed_page_size + length;
+    byteLength = pageHeader.uncompressed_page_size;
+    if (currentPage == null) {
+      return false;
+    }
+
+    // if the buffer holding each page's data is not large enough to hold the current page, re-allocate, with a little extra space
+    if (pageHeader.getUncompressed_page_size() > pageDataByteArray.length) {
+      pageDataByteArray = new byte[pageHeader.getUncompressed_page_size() + 100];
+    }
+    // TODO - would like to get this into the mainline, hopefully before alpha
+    currentPage.getBytes().toByteArray(pageDataByteArray, 0, byteLength);
+
+    readPosInBytes = 0;
+    valuesRead = 0;
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
new file mode 100644
index 0000000..f4988a0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -0,0 +1,357 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.exception.SetupException;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntryFromHDFS;
+import org.apache.drill.exec.physical.ReadEntryWithPath;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.drill.exec.store.AffinityCalculator;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.org.codehaus.jackson.annotate.JsonCreator;
+
+
+@JsonTypeName("parquet-scan")
+public class ParquetGroupScan extends AbstractGroupScan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
+
+  private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings;
+  private List<RowGroupInfo> rowGroupInfos;
+
+  public List<ReadEntryWithPath> getEntries() {
+    return entries;
+  }
+
+  public ParquetStorageEngineConfig getEngineConfig() {
+    return this.engineConfig;
+  }
+
+  private List<ReadEntryWithPath> entries;
+  @JsonIgnore
+  private long totalBytes;
+  private Collection<DrillbitEndpoint> availableEndpoints;
+  private ParquetStorageEngine storageEngine;
+  private StorageEngineRegistry engineRegistry;
+  private ParquetStorageEngineConfig engineConfig;
+  private FileSystem fs;
+  private String fileName;
+  private List<EndpointAffinity> endpointAffinities;
+
+  @JsonCreator
+  public ParquetGroupScan(@JsonProperty("entries") List<ReadEntryWithPath> entries,
+                          @JsonProperty("storageengine") ParquetStorageEngineConfig storageEngineConfig,
+                          @JacksonInject StorageEngineRegistry engineRegistry
+                           )throws SetupException,IOException {
+    engineRegistry.init(DrillConfig.create());
+    this.storageEngine = (ParquetStorageEngine) engineRegistry.getEngine(storageEngineConfig);
+    this.availableEndpoints = storageEngine.getContext().getBits();
+    this.fs = storageEngine.getFileSystem();
+    this.engineConfig = storageEngineConfig;
+    this.engineRegistry = engineRegistry;
+    this.entries = entries;
+    readFooter();
+    this.fileName = rowGroupInfos.get(0).getPath();
+    calculateEndpointBytes();
+  }
+
+  public ParquetGroupScan(ArrayList<ReadEntryWithPath> entries,
+                          ParquetStorageEngine storageEngine) throws IOException {
+    this.storageEngine = storageEngine;
+    this.availableEndpoints = storageEngine.getContext().getBits();
+    this.fs = storageEngine.getFileSystem();
+    this.entries = entries;
+    readFooter();
+    this.fileName = rowGroupInfos.get(0).getPath();
+    calculateEndpointBytes();
+  }
+
+  private void readFooter() throws IOException {
+    long tA = System.nanoTime();
+    rowGroupInfos = new ArrayList();
+    long start = 0, length = 0;
+    ColumnChunkMetaData columnChunkMetaData;
+    for (ReadEntryWithPath readEntryWithPath : entries){
+      Path path = new Path(readEntryWithPath.getPath());
+
+      ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
+      readEntryWithPath.getPath();
+
+      int i = 0;
+      for (BlockMetaData rowGroup : footer.getBlocks()){
+        // need to grab block information from HDFS
+        columnChunkMetaData = rowGroup.getColumns().iterator().next();
+        start = columnChunkMetaData.getFirstDataPageOffset();
+        // this field is not being populated correctly, but the column chunks know their sizes, just summing them for now
+        //end = start + rowGroup.getTotalByteSize();
+        length = 0;
+        for (ColumnChunkMetaData col : rowGroup.getColumns()){
+          length += col.getTotalSize();
+        }
+        rowGroupInfos.add(new ParquetGroupScan.RowGroupInfo(readEntryWithPath.getPath(), start, length, i));
+        logger.debug("rowGroupInfo path: {} start: {} length {}", readEntryWithPath.getPath(), start, length);
+        i++;
+      }
+    }
+    long tB = System.nanoTime();
+    logger.debug("Took {} ms to get row group infos", (float)(tB - tA) / 1E6);
+  }
+
+  private void calculateEndpointBytes() {
+    long tA = System.nanoTime();
+    AffinityCalculator ac = new AffinityCalculator(fileName, fs, availableEndpoints);
+    for (RowGroupInfo e : rowGroupInfos) {
+      ac.setEndpointBytes(e);
+      totalBytes += e.getLength();
+    }
+    long tB = System.nanoTime();
+    logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB - tA) / 1E6);
+  }
+/*
+  public LinkedList<RowGroupInfo> getRowGroups() {
+    return rowGroups;
+  }
+
+  public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) {
+    this.rowGroups = rowGroups;
+  }
+
+  public static class ParquetFileReadEntry {
+
+    String path;
+
+    public ParquetFileReadEntry(@JsonProperty String path){
+      this.path = path;
+    }
+  }
+  */
+
+  @JsonIgnore
+  public FileSystem getFileSystem() {
+    return this.fs;
+  }
+
+  public static class RowGroupInfo extends ReadEntryFromHDFS {
+
+    private HashMap<DrillbitEndpoint,Long> endpointBytes;
+    private long maxBytes;
+    private int rowGroupIndex;
+
+    @JsonCreator
+    public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start,
+                        @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
+      super(path, start, length);
+      this.rowGroupIndex = rowGroupIndex;
+    }
+
+    @Override
+    public OperatorCost getCost() {
+      return new OperatorCost(1, 2, 1, 1);
+    }
+
+    @Override
+    public Size getSize() {
+      // TODO - these values are wrong, I cannot know these until after I read a file
+      return new Size(10, 10);
+    }
+
+    public HashMap<DrillbitEndpoint,Long> getEndpointBytes() {
+      return endpointBytes;
+    }
+
+    public void setEndpointBytes(HashMap<DrillbitEndpoint,Long> endpointBytes) {
+      this.endpointBytes = endpointBytes;
+    }
+
+    public void setMaxBytes(long bytes) {
+      this.maxBytes = bytes;
+    }
+
+    public long getMaxBytes() {
+      return maxBytes;
+    }
+
+    public ParquetRowGroupScan.RowGroupReadEntry getRowGroupReadEntry() {
+      return new ParquetRowGroupScan.RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
+    }
+
+    public int getRowGroupIndex() {
+      return this.rowGroupIndex;
+    }
+  }
+
+  private class ParquetReadEntryComparator implements Comparator<RowGroupInfo> {
+    public int compare(RowGroupInfo e1, RowGroupInfo e2) {
+      if (e1.getMaxBytes() == e2.getMaxBytes()) return 0;
+      return (e1.getMaxBytes() > e2.getMaxBytes()) ? 1 : -1;
+    }
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    long tA = System.nanoTime();
+    if (this.endpointAffinities == null) {
+      HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
+      for (RowGroupInfo entry : rowGroupInfos) {
+        for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
+          long bytes = entry.getEndpointBytes().get(d);
+          float affinity = (float)bytes / (float)totalBytes;
+          logger.error("RowGroup: {} Endpoint: {} Bytes: {}", entry.getRowGroupIndex(), d.getAddress(), bytes);
+          if (affinities.keySet().contains(d)) {
+            affinities.put(d, affinities.get(d) + affinity);
+          } else {
+            affinities.put(d, affinity);
+          }
+        }
+      }
+      List<EndpointAffinity> affinityList = new LinkedList<>();
+      for (DrillbitEndpoint d : affinities.keySet()) {
+        logger.debug("Endpoint {} has affinity {}", d.getAddress(), affinities.get(d).floatValue());
+        affinityList.add(new EndpointAffinity(d,affinities.get(d).floatValue()));
+      }
+      this.endpointAffinities = affinityList;
+    }
+    long tB = System.nanoTime();
+    logger.debug("Took {} ms to get operator affinity", (float)(tB - tA) / 1E6);
+    return this.endpointAffinities;
+  }
+
+
+
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+    long tA = System.nanoTime();
+    Preconditions.checkArgument(endpoints.size() <= rowGroupInfos.size());
+
+    int i = 0;
+    for (DrillbitEndpoint endpoint : endpoints) {
+      logger.debug("Endpoint index {}, endpoint host: {}", i++, endpoint.getAddress());
+    }
+
+    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
+    mappings = new LinkedList[endpoints.size()];
+    LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints, rowGroupInfos, 100, true, false);
+    LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints, unassigned, 50, true, false);
+    LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints, unassigned2, 25, true, false);
+    LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints, unassigned3, 0, false, false);
+    LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints, unassigned4, 0, false, true);
+    assert unassigned5.size() == 0 : String.format("All readEntries should be assigned by now, but some are still unassigned");
+    long tB = System.nanoTime();
+    logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) / 1E6);
+  }
+
+  private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean mustContain, boolean assignAll) {
+    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
+    LinkedList<RowGroupInfo> unassigned = new LinkedList<>();
+
+    int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() * 1.5);
+
+    if (maxEntries < 1) maxEntries = 1;
+
+    int i =0;
+    for(RowGroupInfo e : rowGroups) {
+      boolean assigned = false;
+      for (int j = i; j < i + endpoints.size(); j++) {
+        DrillbitEndpoint currentEndpoint = endpoints.get(j%endpoints.size());
+        if (assignAll ||
+                (e.getEndpointBytes().size() > 0 &&
+                (e.getEndpointBytes().containsKey(currentEndpoint) || !mustContain) &&
+                (mappings[j%endpoints.size()] == null || mappings[j%endpoints.size()].size() < maxEntries) &&
+                e.getEndpointBytes().get(currentEndpoint) >= e.getMaxBytes() * requiredPercentage / 100)) {
+          LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries = mappings[j%endpoints.size()];
+          if(entries == null){
+            entries = new LinkedList<ParquetRowGroupScan.RowGroupReadEntry>();
+            mappings[j%endpoints.size()] = entries;
+          }
+          entries.add(e.getRowGroupReadEntry());
+          logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}", e.getPath(), e.getStart(), currentEndpoint.getAddress());
+          assigned = true;
+          break;
+        }
+      }
+      if (!assigned) unassigned.add(e);
+      i++;
+    }
+    return unassigned;
+  }
+
+  @Override
+  public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
+    assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
+    for (ParquetRowGroupScan.RowGroupReadEntry rg : mappings[minorFragmentId]) {
+      logger.debug("minorFragmentId: {} Path: {} RowGroupIndex: {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
+    }
+    try {
+      return new ParquetRowGroupScan(storageEngine, engineConfig, mappings[minorFragmentId]);
+    } catch (SetupException e) {
+      e.printStackTrace(); // TODO - fix this
+    }
+    return null;
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return rowGroupInfos.size();
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(1,1,1,1);
+  }
+
+  @Override
+  public Size getSize() {
+    // TODO - this is wrong, need to populate correctly
+    return new Size(10,10);
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    //TODO return copy of self
+    return this;
+  }
+
+}