You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:52 UTC
[14/27] Updates to add subscan support to JSON
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
new file mode 100644
index 0000000..fe16b3a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
@@ -0,0 +1,86 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.store.json;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.SubScan;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.Iterators;
+
+@JsonTypeName("json-sub-scan")
+public class JSONSubScan extends AbstractBase implements SubScan{
+
+ protected final List<JSONGroupScan.ScanEntry> readEntries;
+ private final OperatorCost cost;
+ private final Size size;
+
+ @JsonCreator
+ public JSONSubScan(@JsonProperty("entries") List<JSONGroupScan.ScanEntry> readEntries) {
+ this.readEntries = readEntries;
+ OperatorCost cost = new OperatorCost(0,0,0,0);
+ Size size = new Size(0,0);
+ for(JSONGroupScan.ScanEntry r : readEntries){
+ cost = cost.add(r.getCost());
+ size = size.add(r.getSize());
+ }
+ this.cost = cost;
+ this.size = size;
+ }
+
+ public List<JSONGroupScan.ScanEntry> getReadEntries() {
+ return readEntries;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ return new JSONSubScan(readEntries);
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return cost;
+ }
+
+ @Override
+ public Size getSize() {
+ return size;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Iterators.emptyIterator();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
new file mode 100644
index 0000000..d5f1d8f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
@@ -0,0 +1,221 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.mock;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.vector.TypeHelper;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("mock-scan")
+public class MockGroupScanPOP extends AbstractGroupScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class);
+
+ private final String url;
+ protected final List<MockScanEntry> readEntries;
+ private final OperatorCost cost;
+ private final Size size;
+ private LinkedList<MockScanEntry>[] mappings;
+
+ @JsonCreator
+ public MockGroupScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
+ this.readEntries = readEntries;
+ OperatorCost cost = new OperatorCost(0,0,0,0);
+ Size size = new Size(0,0);
+ for(MockScanEntry r : readEntries){
+ cost = cost.add(r.getCost());
+ size = size.add(r.getSize());
+ }
+ this.cost = cost;
+ this.size = size;
+ this.url = url;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ @JsonProperty("entries")
+ public List<MockScanEntry> getReadEntries() {
+ return readEntries;
+ }
+
+ public static class MockScanEntry implements ReadEntry {
+
+ private final int records;
+ private final MockColumn[] types;
+ private final int recordSize;
+
+
+ @JsonCreator
+ public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
+ this.records = records;
+ this.types = types;
+ int size = 0;
+ for(MockColumn dt : types){
+ size += TypeHelper.getSize(dt.getMajorType());
+ }
+ this.recordSize = size;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1, 2, 1, 1);
+ }
+
+ public int getRecords() {
+ return records;
+ }
+
+ public MockColumn[] getTypes() {
+ return types;
+ }
+
+ @Override
+ public Size getSize() {
+ return new Size(records, recordSize);
+ }
+ }
+
+ @JsonInclude(Include.NON_NULL)
+ public static class MockColumn{
+ @JsonProperty("type") public MinorType minorType;
+ public String name;
+ public DataMode mode;
+ public Integer width;
+ public Integer precision;
+ public Integer scale;
+
+
+ @JsonCreator
+ public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
+ this.name = name;
+ this.minorType = minorType;
+ this.mode = mode;
+ this.width = width;
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @JsonProperty("type")
+ public MinorType getMinorType() {
+ return minorType;
+ }
+ public String getName() {
+ return name;
+ }
+ public DataMode getMode() {
+ return mode;
+ }
+ public Integer getWidth() {
+ return width;
+ }
+ public Integer getPrecision() {
+ return precision;
+ }
+ public Integer getScale() {
+ return scale;
+ }
+
+ @JsonIgnore
+ public MajorType getMajorType(){
+ MajorType.Builder b = MajorType.newBuilder();
+ b.setMode(mode);
+ b.setMinorType(minorType);
+ if(precision != null) b.setPrecision(precision);
+ if(width != null) b.setWidth(width);
+ if(scale != null) b.setScale(scale);
+ return b.build();
+ }
+
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ return Collections.emptyList();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+ Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
+
+ mappings = new LinkedList[endpoints.size()];
+
+ int i =0;
+ for(MockScanEntry e : this.getReadEntries()){
+ if(i == endpoints.size()) i -= endpoints.size();
+ LinkedList<MockScanEntry> entries = mappings[i];
+ if(entries == null){
+ entries = new LinkedList<MockScanEntry>();
+ mappings[i] = entries;
+ }
+ entries.add(e);
+ i++;
+ }
+ }
+
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) {
+ assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
+ return new MockSubScanPOP(url, mappings[minorFragmentId]);
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return readEntries.size();
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return cost;
+ }
+
+ @Override
+ public Size getSize() {
+ return size;
+ }
+
+ @Override
+ @JsonIgnore
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new MockGroupScanPOP(url, readEntries);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
new file mode 100644
index 0000000..024aa21
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -0,0 +1,118 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.mock;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockColumn;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
+
+public class MockRecordReader implements RecordReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
+
+ private OutputMutator output;
+ private MockScanEntry config;
+ private FragmentContext context;
+ private ValueVector[] valueVectors;
+ private int recordsRead;
+ private int batchRecordCount;
+
+ public MockRecordReader(FragmentContext context, MockScanEntry config) {
+ this.context = context;
+ this.config = config;
+ }
+
+ private int getEstimatedRecordSize(MockColumn[] types) {
+ int x = 0;
+ for (int i = 0; i < types.length; i++) {
+ x += TypeHelper.getSize(types[i].getMajorType());
+ }
+ return x;
+ }
+
+ private ValueVector getVector(String name, MajorType type, int length) {
+ assert context != null : "Context shouldn't be null.";
+ MaterializedField f = MaterializedField.create(new SchemaPath(name, ExpressionPosition.UNKNOWN), type);
+ ValueVector v;
+ v = TypeHelper.getNewVector(f, context.getAllocator());
+ AllocationHelper.allocate(v, length, 50, 4);
+
+ return v;
+
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+ try {
+ this.output = output;
+ int estimateRowSize = getEstimatedRecordSize(config.getTypes());
+ valueVectors = new ValueVector[config.getTypes().length];
+ batchRecordCount = 250000 / estimateRowSize;
+
+ for (int i = 0; i < config.getTypes().length; i++) {
+ valueVectors[i] = getVector(config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
+ output.addField(valueVectors[i]);
+ }
+ output.setNewSchema();
+ } catch (SchemaChangeException e) {
+ throw new ExecutionSetupException("Failure while setting up fields", e);
+ }
+
+ }
+
+ @Override
+ public int next() {
+
+ int recordSetSize = Math.min(batchRecordCount, this.config.getRecords()- recordsRead);
+
+ recordsRead += recordSetSize;
+ for(ValueVector v : valueVectors){
+ AllocationHelper.allocate(v, recordSetSize, 50, 5);
+
+ logger.debug("MockRecordReader: Generating random data for VV of type " + v.getClass().getName());
+ ValueVector.Mutator m = v.getMutator();
+ m.setValueCount(recordSetSize);
+ m.generateTestData();
+
+ }
+ return recordSetSize;
+ }
+
+ @Override
+ public void cleanup() {
+ for (int i = 0; i < valueVectors.length; i++) {
+ try {
+ output.removeField(valueVectors[i].getField());
+ } catch (SchemaChangeException e) {
+ logger.warn("Failure while trying to remove field.", e);
+ }
+ valueVectors[i].close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
new file mode 100644
index 0000000..5c91e1c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.mock;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ List<MockScanEntry> entries = config.getReadEntries();
+ List<RecordReader> readers = Lists.newArrayList();
+ for(MockScanEntry e : entries){
+ readers.add(new MockRecordReader(context, e));
+ }
+ return new ScanBatch(context, readers.iterator());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
index 23ac2b8..1ea6958 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -22,10 +22,9 @@ import java.util.ArrayList;
import org.apache.drill.common.logical.data.Scan;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStorageEngine;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
import org.apache.drill.storage.MockStorageEngineConfig;
import com.fasterxml.jackson.core.type.TypeReference;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
new file mode 100644
index 0000000..4dbcd63
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
@@ -0,0 +1,75 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.mock;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractStore;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("mock-store")
+public class MockStorePOP extends AbstractStore {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorePOP.class);
+
+ @JsonCreator
+ public MockStorePOP(@JsonProperty("child") PhysicalOperator child) {
+ super(child);
+ }
+
+ public int getMaxWidth() {
+ return 1;
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+
+ }
+
+ @Override
+ public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
+ return new MockStorePOP(child);
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1,getSize().getRecordCount()*getSize().getRecordSize(),1,1);
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new MockStorePOP(child);
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
new file mode 100644
index 0000000..38bf337
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
@@ -0,0 +1,115 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.mock;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.collect.Iterators;
+import org.apache.drill.common.graph.GraphVisitor;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.vector.TypeHelper;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("mock-sub-scan")
+public class MockSubScanPOP extends AbstractBase implements SubScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class);
+
+ private final String url;
+ protected final List<MockGroupScanPOP.MockScanEntry> readEntries;
+ private final OperatorCost cost;
+ private final Size size;
+ private LinkedList<MockGroupScanPOP.MockScanEntry>[] mappings;
+
+ @JsonCreator
+ public MockSubScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockGroupScanPOP.MockScanEntry> readEntries) {
+ this.readEntries = readEntries;
+ OperatorCost cost = new OperatorCost(0,0,0,0);
+ Size size = new Size(0,0);
+ for(MockGroupScanPOP.MockScanEntry r : readEntries){
+ cost = cost.add(r.getCost());
+ size = size.add(r.getSize());
+ }
+ this.cost = cost;
+ this.size = size;
+ this.url = url;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ @JsonProperty("entries")
+ public List<MockGroupScanPOP.MockScanEntry> getReadEntries() {
+ return readEntries;
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Iterators.emptyIterator();
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Size getSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ // will want to replace these two methods with an interface above for AbstractSubScan
+ @Override
+ public boolean isExecutable() {
+ return true; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+ return physicalVisitor.visitSubScan(this, value);
+ }
+ // see comment above about replacing this
+
+ @Override
+ @JsonIgnore
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new MockSubScanPOP(url, readEntries);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
index 8b4f760..99b65e6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
@@ -62,7 +62,7 @@ public abstract class ColumnReader {
ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, ValueVector v){
this.parentReader = parentReader;
- if (allocateSize > 1) valueVecHolder = new VectorHolder(allocateSize, (BaseDataValueVector) v);
+ if (allocateSize > 1) valueVecHolder = new VectorHolder(allocateSize, v);
else valueVecHolder = new VectorHolder(5000, (BaseDataValueVector) v);
columnDescriptor = descriptor;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
index 29d9cc7..0378960 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -36,7 +36,10 @@ public final class PageReadStatus {
// store references to the pages that have been uncompressed, but not copied to ValueVectors yet
Page currentPage;
// buffer to store bytes of current page, set to max size of parquet page
- byte[] pageDataByteArray = new byte[ParquetRecordReader.PARQUET_PAGE_MAX_SIZE];
+ // TODO: add this back once toByteArray accepts an input. byte[] pageDataByteArray = new byte[ParquetRecordReader.PARQUET_PAGE_MAX_SIZE];
+ byte[] pageDataByteArray;
+
+
PageReader pageReader;
// read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData
long readPosInBytes;
@@ -103,11 +106,13 @@ public final class PageReadStatus {
}
// if the buffer holding each page's data is not large enough to hold the current page, re-allocate, with a little extra space
- if (pageHeader.getUncompressed_page_size() > pageDataByteArray.length) {
- pageDataByteArray = new byte[pageHeader.getUncompressed_page_size() + 100];
- }
+// if (pageHeader.getUncompressed_page_size() > pageDataByteArray.length) {
+// pageDataByteArray = new byte[pageHeader.getUncompressed_page_size() + 100];
+// }
// TODO - would like to get this into the mainline, hopefully before alpha
- currentPage.getBytes().toByteArray(pageDataByteArray, 0, byteLength);
+ pageDataByteArray = currentPage.getBytes().toByteArray();
+ //TODO: Fix once parquet supports buffer work or at least passing in array.
+ //pageDataByteArray = currentPage.getBytes().toByteArray(pageDataByteArray, 0, byteLength);
readPosInBytes = 0;
valuesRead = 0;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index f4988a0..66c1550 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.exception.SetupException;
@@ -36,15 +37,17 @@ import org.apache.drill.exec.physical.ReadEntryWithPath;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.base.Preconditions;
+
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StorageEngineRegistry;
import org.apache.drill.exec.store.AffinityCalculator;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index bd63406..03fb4ec 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -24,17 +24,18 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.MockScanBatchCreator;
-
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.mock.MockScanBatchCreator;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index c9d6967..2d9524d 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -11,6 +11,7 @@ import java.util.List;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
+
import mockit.Expectations;
import mockit.Injectable;
@@ -23,6 +24,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.proto.SchemaDefProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.json.JSONRecordReader;
import org.apache.drill.exec.vector.ValueVector;
import org.junit.Ignore;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 0e31cdd..5628f50 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -27,7 +27,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Vector;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.types.TypeProtos;
@@ -72,40 +71,30 @@ public class ParquetRecordReaderTest {
new ParquetRecordReaderTest().testMultipleRowGroupsAndReadsEvent();
}
+
@Test
public void testMultipleRowGroupsAndReadsEvent() throws Exception {
String planName = "/parquet_scan_screen.json";
- String fileName = "/tmp/testParquetFile_many_types_3";
+ String fileName = "/tmp/parquet_test_file_many_types";
int numberRowGroups = 20;
int recordsPerRowGroup = 300000;
- //TestFileGenerator.generateParquetFile(fileName, numberRowGroups, recordsPerRowGroup);
+ File f = new File(fileName);
+ if(!f.exists()) TestFileGenerator.generateParquetFile(fileName, numberRowGroups, recordsPerRowGroup);
testParquetFullEngineLocal(planName, fileName, 2, numberRowGroups, recordsPerRowGroup);
}
private class ParquetResultListener implements UserResultsListener {
- private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>();
private SettableFuture<Void> future = SettableFuture.create();
- int count = 0;
RecordBatchLoader batchLoader;
- byte[] bytes;
- int numberRowGroups;
- int numberOfTimesRead;
int batchCounter = 1;
- int columnValCounter = 0;
- int i = 0;
- private FieldInfo currentField;
private final HashMap<String, Long> valuesChecked = new HashMap<>();
- private final int recordsPerRowGroup;
private final Map<String, FieldInfo> fields;
private final long totalRecords;
ParquetResultListener(int recordsPerRowGroup, RecordBatchLoader batchLoader, int numberRowGroups, int numberOfTimesRead){
this.batchLoader = batchLoader;
this.fields = TestFileGenerator.getFieldMap(recordsPerRowGroup);
- this.recordsPerRowGroup = recordsPerRowGroup;
- this.numberRowGroups = numberRowGroups;
- this.numberOfTimesRead = numberOfTimesRead;
this.totalRecords = recordsPerRowGroup * numberRowGroups * numberOfTimesRead;
}
@@ -120,7 +109,7 @@ public class ParquetRecordReaderTest {
long columnValCounter = 0;
int i = 0;
FieldInfo currentField;
- count += result.getHeader().getRowCount();
+
boolean schemaChanged = false;
try {
schemaChanged = batchLoader.load(result.getHeader().getDef(), result.getData());
@@ -128,12 +117,11 @@ public class ParquetRecordReaderTest {
logger.error("Failure while loading batch", e);
}
- int recordCount = 0;
// print headers.
if (schemaChanged) {
} // do not believe any change is needed for when the schema changes, with the current mock scan use case
- for (VectorWrapper vw : batchLoader) {
+ for (VectorWrapper<?> vw : batchLoader) {
ValueVector vv = vw.getValueVector();
currentField = fields.get(vv.getField().getName());
if (VERBOSE_DEBUG){
@@ -163,7 +151,6 @@ public class ParquetRecordReaderTest {
if (VERBOSE_DEBUG){
for (i = 0; i < batchLoader.getRecordCount(); i++) {
- recordCount++;
if (i % 50 == 0){
System.out.println();
for (VectorWrapper<?> vw : batchLoader) {
@@ -298,11 +285,6 @@ public class ParquetRecordReaderTest {
@SuppressWarnings("unchecked")
private <T> void assertField(ValueVector valueVector, int index, TypeProtos.MinorType expectedMinorType, T value, String name, int parentFieldId) {
-// UserBitShared.FieldMetadata metadata = valueVector.getMetadata();
-// SchemaDefProtos.FieldDef def = metadata.getDef();
-// assertEquals(expectedMinorType, def.getMajorType().getMinorType());
-// assertEquals(name, def.getNameList().get(0).getName());
-// assertEquals(parentFieldId, def.getParentId());
if (expectedMinorType == TypeProtos.MinorType.MAP) {
return;
@@ -339,9 +321,6 @@ public class ParquetRecordReaderTest {
assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray());
}
- private String getResource(String resourceName) {
- return "resource:" + resourceName;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
index 15d3936..29cab68 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
@@ -22,10 +22,10 @@
storageengine:"parquet",
selection: [
{
- path: "/tmp/testParquetFile_many_types_3"
+ path: "/tmp/parquet_test_file_many_types"
},
{
- path: "/tmp/testParquetFile_many_types_3"
+ path: "/tmp/parquet_test_file_many_types"
}
]
},
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
index 79b8ef8..35a2414 100644
--- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
+++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
@@ -3,6 +3,7 @@ package org.apache.drill.jdbc.test;
import java.io.IOException;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@@ -12,6 +13,7 @@ import org.junit.rules.Timeout;
import com.google.common.base.Charsets;
import com.google.common.io.Resources;
+@Ignore
public class FullEngineTest {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FullEngineTest.class);