You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:52 UTC

[14/27] Updates to add subscan support to JSON

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
new file mode 100644
index 0000000..fe16b3a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
@@ -0,0 +1,86 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.store.json;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.SubScan;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.Iterators;
+
+@JsonTypeName("json-sub-scan")
+public class JSONSubScan extends AbstractBase implements SubScan{
+
+    protected final List<JSONGroupScan.ScanEntry> readEntries;
+    private final OperatorCost cost;
+    private final Size size;
+    
+    @JsonCreator
+    public JSONSubScan(@JsonProperty("entries") List<JSONGroupScan.ScanEntry> readEntries) {
+        this.readEntries = readEntries;
+        OperatorCost cost = new OperatorCost(0,0,0,0);
+        Size size = new Size(0,0);
+        for(JSONGroupScan.ScanEntry r : readEntries){
+          cost = cost.add(r.getCost());
+          size = size.add(r.getSize());
+        }
+        this.cost = cost;
+        this.size = size;
+    }
+
+    public List<JSONGroupScan.ScanEntry> getReadEntries() {
+      return readEntries;
+    }
+
+    @Override
+    public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+        return new JSONSubScan(readEntries);
+    }
+
+    @Override
+    public OperatorCost getCost() {
+      return cost;
+    }
+
+    @Override
+    public Size getSize() {
+      return size;
+    }
+
+    @Override
+    public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+      return physicalVisitor.visitSubScan(this, value);
+    }
+
+    @Override
+    public Iterator<PhysicalOperator> iterator() {
+      return Iterators.emptyIterator();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
new file mode 100644
index 0000000..d5f1d8f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
@@ -0,0 +1,221 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.mock;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.vector.TypeHelper;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("mock-scan")
+public class MockGroupScanPOP extends AbstractGroupScan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class);
+
+  private final String url;
+  protected final List<MockScanEntry> readEntries;
+  private final OperatorCost cost;
+  private final Size size;
+  private  LinkedList<MockScanEntry>[] mappings;
+
+  @JsonCreator
+  public MockGroupScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
+    this.readEntries = readEntries;
+    OperatorCost cost = new OperatorCost(0,0,0,0);
+    Size size = new Size(0,0);
+    for(MockScanEntry r : readEntries){
+      cost = cost.add(r.getCost());
+      size = size.add(r.getSize());
+    }
+    this.cost = cost;
+    this.size = size;
+    this.url = url;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  @JsonProperty("entries")
+  public List<MockScanEntry> getReadEntries() {
+    return readEntries;
+  }
+  
+  public static class MockScanEntry implements ReadEntry {
+
+    private final int records;
+    private final MockColumn[] types;
+    private final int recordSize;
+    
+
+    @JsonCreator
+    public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
+      this.records = records;
+      this.types = types;
+      int size = 0;
+      for(MockColumn dt : types){
+        size += TypeHelper.getSize(dt.getMajorType());
+      }
+      this.recordSize = size;
+    }
+
+    @Override
+    public OperatorCost getCost() {
+      return new OperatorCost(1, 2, 1, 1);
+    }
+    
+    public int getRecords() {
+      return records;
+    }
+
+    public MockColumn[] getTypes() {
+      return types;
+    }
+
+    @Override
+    public Size getSize() {
+      return new Size(records, recordSize);
+    }
+  }
+  
+  @JsonInclude(Include.NON_NULL)
+  public static class MockColumn{
+    @JsonProperty("type") public MinorType minorType;
+    public String name;
+    public DataMode mode;
+    public Integer width;
+    public Integer precision;
+    public Integer scale;
+    
+    
+    @JsonCreator
+    public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
+      this.name = name;
+      this.minorType = minorType;
+      this.mode = mode;
+      this.width = width;
+      this.precision = precision;
+      this.scale = scale;
+    }
+    
+    @JsonProperty("type")
+    public MinorType getMinorType() {
+      return minorType;
+    }
+    public String getName() {
+      return name;
+    }
+    public DataMode getMode() {
+      return mode;
+    }
+    public Integer getWidth() {
+      return width;
+    }
+    public Integer getPrecision() {
+      return precision;
+    }
+    public Integer getScale() {
+      return scale;
+    }
+    
+    @JsonIgnore
+    public MajorType getMajorType(){
+      MajorType.Builder b = MajorType.newBuilder();
+      b.setMode(mode);
+      b.setMinorType(minorType);
+      if(precision != null) b.setPrecision(precision);
+      if(width != null) b.setWidth(width);
+      if(scale != null) b.setScale(scale);
+      return b.build();
+    }
+    
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return Collections.emptyList();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+    Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
+    
+    mappings = new LinkedList[endpoints.size()];
+
+    int i =0;
+    for(MockScanEntry e : this.getReadEntries()){
+      if(i == endpoints.size()) i -= endpoints.size();
+      LinkedList<MockScanEntry> entries = mappings[i];
+      if(entries == null){
+        entries = new LinkedList<MockScanEntry>();
+        mappings[i] = entries;
+      }
+      entries.add(e);
+      i++;
+    }
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) {
+    assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
+    return new MockSubScanPOP(url, mappings[minorFragmentId]);
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return readEntries.size();
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return cost;
+  }
+
+  @Override
+  public Size getSize() {
+    return size;
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new MockGroupScanPOP(url, readEntries);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
new file mode 100644
index 0000000..024aa21
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -0,0 +1,118 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.mock;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockColumn;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
+
+public class MockRecordReader implements RecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
+
+  private OutputMutator output;
+  private MockScanEntry config;
+  private FragmentContext context;
+  private ValueVector[] valueVectors;
+  private int recordsRead;
+  private int batchRecordCount;
+
+  public MockRecordReader(FragmentContext context, MockScanEntry config) {
+    this.context = context;
+    this.config = config;
+  }
+
+  private int getEstimatedRecordSize(MockColumn[] types) {
+    int x = 0;
+    for (int i = 0; i < types.length; i++) {
+      x += TypeHelper.getSize(types[i].getMajorType());
+    }
+    return x;
+  }
+
+  private ValueVector getVector(String name, MajorType type, int length) {
+    assert context != null : "Context shouldn't be null.";
+    MaterializedField f = MaterializedField.create(new SchemaPath(name, ExpressionPosition.UNKNOWN), type);
+    ValueVector v;
+    v = TypeHelper.getNewVector(f, context.getAllocator());
+    AllocationHelper.allocate(v, length, 50, 4);
+    
+    return v;
+
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+    try {
+      this.output = output;
+      int estimateRowSize = getEstimatedRecordSize(config.getTypes());
+      valueVectors = new ValueVector[config.getTypes().length];
+      batchRecordCount = 250000 / estimateRowSize;
+
+      for (int i = 0; i < config.getTypes().length; i++) {
+        valueVectors[i] = getVector(config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
+        output.addField(valueVectors[i]);
+      }
+      output.setNewSchema();
+    } catch (SchemaChangeException e) {
+      throw new ExecutionSetupException("Failure while setting up fields", e);
+    }
+
+  }
+
+  @Override
+  public int next() {
+    
+    int recordSetSize = Math.min(batchRecordCount, this.config.getRecords()- recordsRead);
+
+    recordsRead += recordSetSize;
+    for(ValueVector v : valueVectors){
+      AllocationHelper.allocate(v, recordSetSize, 50, 5);
+      
+      logger.debug("MockRecordReader:  Generating random data for VV of type " + v.getClass().getName());
+      ValueVector.Mutator m = v.getMutator();
+      m.setValueCount(recordSetSize);
+      m.generateTestData();
+      
+    }
+    return recordSetSize;
+  }
+
+  @Override
+  public void cleanup() {
+    for (int i = 0; i < valueVectors.length; i++) {
+      try {
+        output.removeField(valueVectors[i].getField());
+      } catch (SchemaChangeException e) {
+        logger.warn("Failure while trying to remove field.", e);
+      }
+      valueVectors[i].close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
new file mode 100644
index 0000000..5c91e1c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.mock;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    List<MockScanEntry> entries = config.getReadEntries();
+    List<RecordReader> readers = Lists.newArrayList();
+    for(MockScanEntry e : entries){
+      readers.add(new MockRecordReader(context, e));
+    }
+    return new ScanBatch(context, readers.iterator());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
index 23ac2b8..1ea6958 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -22,10 +22,9 @@ import java.util.ArrayList;
 
 import org.apache.drill.common.logical.data.Scan;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStorageEngine;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
 import org.apache.drill.storage.MockStorageEngineConfig;
 
 import com.fasterxml.jackson.core.type.TypeReference;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
new file mode 100644
index 0000000..4dbcd63
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
@@ -0,0 +1,75 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.mock;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractStore;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("mock-store")
+public class MockStorePOP extends AbstractStore {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorePOP.class);
+
+  @JsonCreator
+  public MockStorePOP(@JsonProperty("child") PhysicalOperator child) {
+    super(child);
+  }
+
+  public int getMaxWidth() {
+    return 1;
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+    
+  }
+
+  @Override
+  public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
+    return new MockStorePOP(child);
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(1,getSize().getRecordCount()*getSize().getRecordSize(),1,1);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new MockStorePOP(child);
+  }
+
+
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
new file mode 100644
index 0000000..38bf337
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
@@ -0,0 +1,115 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.mock;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.collect.Iterators;
+import org.apache.drill.common.graph.GraphVisitor;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.vector.TypeHelper;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("mock-sub-scan")
+public class MockSubScanPOP extends AbstractBase implements SubScan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class);
+
+  private final String url;
+  protected final List<MockGroupScanPOP.MockScanEntry> readEntries;
+  private final OperatorCost cost;
+  private final Size size;
+  private  LinkedList<MockGroupScanPOP.MockScanEntry>[] mappings;
+
+  @JsonCreator
+  public MockSubScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockGroupScanPOP.MockScanEntry> readEntries) {
+    this.readEntries = readEntries;
+    OperatorCost cost = new OperatorCost(0,0,0,0);
+    Size size = new Size(0,0);
+    for(MockGroupScanPOP.MockScanEntry r : readEntries){
+      cost = cost.add(r.getCost());
+      size = size.add(r.getSize());
+    }
+    this.cost = cost;
+    this.size = size;
+    this.url = url;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  @JsonProperty("entries")
+  public List<MockGroupScanPOP.MockScanEntry> getReadEntries() {
+    return readEntries;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.emptyIterator();
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Size getSize() {
+    throw new UnsupportedOperationException();
+  }
+
+  // will want to replace these two methods with an interface above for AbstractSubScan
+  @Override
+  public boolean isExecutable() {
+    return true;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitSubScan(this, value);
+  }
+  // see comment above about replacing this
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new MockSubScanPOP(url, readEntries);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
index 8b4f760..99b65e6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
@@ -62,7 +62,7 @@ public abstract class ColumnReader {
   ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                boolean fixedLength, ValueVector v){
     this.parentReader = parentReader;
-    if (allocateSize > 1) valueVecHolder = new VectorHolder(allocateSize, (BaseDataValueVector) v);
+    if (allocateSize > 1) valueVecHolder = new VectorHolder(allocateSize, v);
     else valueVecHolder = new VectorHolder(5000, (BaseDataValueVector) v);
 
     columnDescriptor = descriptor;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
index 29d9cc7..0378960 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -36,7 +36,10 @@ public final class PageReadStatus {
   // store references to the pages that have been uncompressed, but not copied to ValueVectors yet
   Page currentPage;
   // buffer to store bytes of current page, set to max size of parquet page
-  byte[] pageDataByteArray = new byte[ParquetRecordReader.PARQUET_PAGE_MAX_SIZE];
+  // TODO: add this back once toByteArray accepts an input.  byte[] pageDataByteArray = new byte[ParquetRecordReader.PARQUET_PAGE_MAX_SIZE];
+  byte[] pageDataByteArray;
+  
+  
   PageReader pageReader;
   // read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData
   long readPosInBytes;
@@ -103,11 +106,13 @@ public final class PageReadStatus {
     }
 
     // if the buffer holding each page's data is not large enough to hold the current page, re-allocate, with a little extra space
-    if (pageHeader.getUncompressed_page_size() > pageDataByteArray.length) {
-      pageDataByteArray = new byte[pageHeader.getUncompressed_page_size() + 100];
-    }
+//    if (pageHeader.getUncompressed_page_size() > pageDataByteArray.length) {
+//      pageDataByteArray = new byte[pageHeader.getUncompressed_page_size() + 100];
+//    }
     // TODO - would like to get this into the mainline, hopefully before alpha
-    currentPage.getBytes().toByteArray(pageDataByteArray, 0, byteLength);
+    pageDataByteArray = currentPage.getBytes().toByteArray();
+    //TODO: Fix once parquet supports buffer work or at least passing in array.
+    //pageDataByteArray = currentPage.getBytes().toByteArray(pageDataByteArray, 0, byteLength);
 
     readPosInBytes = 0;
     valuesRead = 0;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index f4988a0..66c1550 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.exception.SetupException;
@@ -36,15 +37,17 @@ import org.apache.drill.exec.physical.ReadEntryWithPath;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.StorageEngineRegistry;
 import org.apache.drill.exec.store.AffinityCalculator;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+
 import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index bd63406..03fb4ec 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -24,17 +24,18 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.MockScanBatchCreator;
-
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.mock.MockScanBatchCreator;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+
 import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.metadata.ParquetMetadata;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index c9d6967..2d9524d 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -11,6 +11,7 @@ import java.util.List;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
+
 import mockit.Expectations;
 import mockit.Injectable;
 
@@ -23,6 +24,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.proto.SchemaDefProtos;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.json.JSONRecordReader;
 import org.apache.drill.exec.vector.ValueVector;
 import org.junit.Ignore;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 0e31cdd..5628f50 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -27,7 +27,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Vector;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.types.TypeProtos;
@@ -72,40 +71,30 @@ public class ParquetRecordReaderTest {
     new ParquetRecordReaderTest().testMultipleRowGroupsAndReadsEvent();
   }
 
+ 
   @Test
   public void testMultipleRowGroupsAndReadsEvent() throws Exception {
     String planName = "/parquet_scan_screen.json";
-    String fileName = "/tmp/testParquetFile_many_types_3";
+    String fileName = "/tmp/parquet_test_file_many_types";
     int numberRowGroups = 20;
     int recordsPerRowGroup = 300000;
-    //TestFileGenerator.generateParquetFile(fileName, numberRowGroups, recordsPerRowGroup);
+    File f = new File(fileName);
+    if(!f.exists()) TestFileGenerator.generateParquetFile(fileName, numberRowGroups, recordsPerRowGroup);
     testParquetFullEngineLocal(planName, fileName, 2, numberRowGroups, recordsPerRowGroup);
   }
 
   private class ParquetResultListener implements UserResultsListener {
-    private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>();
     private SettableFuture<Void> future = SettableFuture.create();
-    int count = 0;
     RecordBatchLoader batchLoader;
-    byte[] bytes;
 
-    int numberRowGroups;
-    int numberOfTimesRead;
     int batchCounter = 1;
-    int columnValCounter = 0;
-    int i = 0;
-    private FieldInfo currentField;
     private final HashMap<String, Long> valuesChecked = new HashMap<>();
-    private final int recordsPerRowGroup;
     private final Map<String, FieldInfo> fields;
     private final long totalRecords;
     
     ParquetResultListener(int recordsPerRowGroup, RecordBatchLoader batchLoader, int numberRowGroups, int numberOfTimesRead){
       this.batchLoader = batchLoader;
       this.fields = TestFileGenerator.getFieldMap(recordsPerRowGroup);
-      this.recordsPerRowGroup = recordsPerRowGroup;
-      this.numberRowGroups = numberRowGroups;
-      this.numberOfTimesRead = numberOfTimesRead;
       this.totalRecords = recordsPerRowGroup * numberRowGroups * numberOfTimesRead;
     }
 
@@ -120,7 +109,7 @@ public class ParquetRecordReaderTest {
       long columnValCounter = 0;
       int i = 0;
       FieldInfo currentField;
-      count += result.getHeader().getRowCount();
+
       boolean schemaChanged = false;
       try {
         schemaChanged = batchLoader.load(result.getHeader().getDef(), result.getData());
@@ -128,12 +117,11 @@ public class ParquetRecordReaderTest {
         logger.error("Failure while loading batch", e);
       }
 
-      int recordCount = 0;
       // print headers.
       if (schemaChanged) {
       } // do not believe any change is needed for when the schema changes, with the current mock scan use case
 
-      for (VectorWrapper vw : batchLoader) {
+      for (VectorWrapper<?> vw : batchLoader) {
         ValueVector vv = vw.getValueVector();
         currentField = fields.get(vv.getField().getName());
         if (VERBOSE_DEBUG){
@@ -163,7 +151,6 @@ public class ParquetRecordReaderTest {
       
       if (VERBOSE_DEBUG){
         for (i = 0; i < batchLoader.getRecordCount(); i++) {
-          recordCount++;
           if (i % 50 == 0){
             System.out.println();
             for (VectorWrapper<?> vw : batchLoader) {
@@ -298,11 +285,6 @@ public class ParquetRecordReaderTest {
 
   @SuppressWarnings("unchecked")
   private <T> void assertField(ValueVector valueVector, int index, TypeProtos.MinorType expectedMinorType, T value, String name, int parentFieldId) {
-//    UserBitShared.FieldMetadata metadata = valueVector.getMetadata();
-//    SchemaDefProtos.FieldDef def = metadata.getDef();
-//    assertEquals(expectedMinorType, def.getMajorType().getMinorType());
-//    assertEquals(name, def.getNameList().get(0).getName());
-//    assertEquals(parentFieldId, def.getParentId());
 
     if (expectedMinorType == TypeProtos.MinorType.MAP) {
       return;
@@ -339,9 +321,6 @@ public class ParquetRecordReaderTest {
     assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray());
   }
 
-  private String getResource(String resourceName) {
-    return "resource:" + resourceName;
-  }
 
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
index 15d3936..29cab68 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
@@ -22,10 +22,10 @@
       storageengine:"parquet",
       selection: [
         {
-            path: "/tmp/testParquetFile_many_types_3"
+            path: "/tmp/parquet_test_file_many_types"
         },
         {
-            path: "/tmp/testParquetFile_many_types_3"
+            path: "/tmp/parquet_test_file_many_types"
         }
       ]
     },

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
index 79b8ef8..35a2414 100644
--- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
+++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
@@ -3,6 +3,7 @@ package org.apache.drill.jdbc.test;
 import java.io.IOException;
 
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -12,6 +13,7 @@ import org.junit.rules.Timeout;
 import com.google.common.base.Charsets;
 import com.google.common.io.Resources;
 
+@Ignore
 public class FullEngineTest {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FullEngineTest.class);