You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:53 UTC
[15/27] git commit: Updates to add subscan support to JSON
Updates to add subscan support to JSON
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2884db7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2884db7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2884db7a
Branch: refs/heads/master
Commit: 2884db7a9162f9d348dbb88663e0da191a399c4d
Parents: 402be7e
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Aug 15 17:57:24 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 17:57:24 2013 -0700
----------------------------------------------------------------------
sandbox/prototype/exec/java-exec/pom.xml | 15 +-
.../apache/drill/exec/opt/BasicOptimizer.java | 2 +-
.../physical/config/JSONScanBatchCreator.java | 46 --
.../drill/exec/physical/config/JSONScanPOP.java | 114 -----
.../exec/physical/config/MockGroupScanPOP.java | 221 ---------
.../exec/physical/config/MockRecordReader.java | 118 -----
.../physical/config/MockScanBatchCreator.java | 46 --
.../exec/physical/config/MockStorePOP.java | 75 ---
.../exec/physical/config/MockSubScanPOP.java | 115 -----
.../drill/exec/physical/impl/ImplCreator.java | 56 +--
.../drill/exec/store/JSONRecordReader.java | 486 ------------------
.../apache/drill/exec/store/VectorHolder.java | 21 +-
.../drill/exec/store/json/JSONGroupScan.java | 145 ++++++
.../drill/exec/store/json/JSONRecordReader.java | 489 +++++++++++++++++++
.../exec/store/json/JSONScanBatchCreator.java | 46 ++
.../drill/exec/store/json/JSONSubScan.java | 86 ++++
.../drill/exec/store/mock/MockGroupScanPOP.java | 221 +++++++++
.../drill/exec/store/mock/MockRecordReader.java | 118 +++++
.../exec/store/mock/MockScanBatchCreator.java | 46 ++
.../exec/store/mock/MockStorageEngine.java | 3 +-
.../drill/exec/store/mock/MockStorePOP.java | 75 +++
.../drill/exec/store/mock/MockSubScanPOP.java | 115 +++++
.../drill/exec/store/parquet/ColumnReader.java | 2 +-
.../exec/store/parquet/PageReadStatus.java | 15 +-
.../exec/store/parquet/ParquetGroupScan.java | 5 +-
.../store/parquet/ParquetScanBatchCreator.java | 5 +-
.../drill/exec/store/JSONRecordReaderTest.java | 2 +
.../store/parquet/ParquetRecordReaderTest.java | 33 +-
.../src/test/resources/parquet_scan_screen.json | 4 +-
.../apache/drill/jdbc/test/FullEngineTest.java | 2 +
30 files changed, 1416 insertions(+), 1311 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index a2e8501..b36208c 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -46,12 +46,12 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-column</artifactId>
- <version>1.0.1-SNAPSHOT</version>
+ <version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hadoop</artifactId>
- <version>1.0.1-SNAPSHOT</version>
+ <version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
@@ -106,7 +106,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
- <version>1.0.3-mapr-2.1.2.1</version>
+ <version>1.2.1</version>
<exclusions>
<exclusion>
<artifactId>jets3t</artifactId>
@@ -291,12 +291,5 @@
</build>
- <repositories>
- <repository>
- <id>mapr-releases</id>
- <url>http://repository.mapr.com/maven/</url>
- <snapshots><enabled>false</enabled></snapshots>
- <releases><enabled>true</enabled></releases>
- </repository>
- </repositories>
+
</project>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index b5eea03..2c2a342 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -28,10 +28,10 @@ import org.apache.drill.exec.exception.SetupException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.store.StorageEngine;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP;
import com.fasterxml.jackson.core.type.TypeReference;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java
deleted file mode 100644
index f93f03b..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.physical.config;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.JSONRecordReader;
-import org.apache.drill.exec.store.RecordReader;
-
-import java.util.List;
-
-public class JSONScanBatchCreator implements BatchCreator<JSONScanPOP> {
-
- @Override
- public RecordBatch getBatch(FragmentContext context, JSONScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
- Preconditions.checkArgument(children.isEmpty());
- List<JSONScanPOP.ScanEntry> entries = config.getReadEntries();
- List<RecordReader> readers = Lists.newArrayList();
- for (JSONScanPOP.ScanEntry e : entries) {
- readers.add(new JSONRecordReader(context, e.getUrl()));
- }
-
- return new ScanBatch(context, readers.iterator());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java
deleted file mode 100644
index 1dcf5e1..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.physical.config;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.AbstractScan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Scan;
-import org.apache.drill.exec.physical.base.Size;
-import org.apache.drill.exec.proto.CoordinationProtos;
-
-import java.io.File;
-import java.net.URI;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-@JsonTypeName("json-scan")
-public class JSONScanPOP extends AbstractScan<JSONScanPOP.ScanEntry> {
- private static int ESTIMATED_RECORD_SIZE = 1024; // 1kb
-
- private LinkedList[] mappings;
-
- @JsonCreator
- public JSONScanPOP(@JsonProperty("entries") List<JSONScanPOP.ScanEntry> readEntries) {
- super(readEntries);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
- checkArgument(endpoints.size() <= getReadEntries().size());
-
- mappings = new LinkedList[endpoints.size()];
-
- int i = 0;
- for (ScanEntry e : this.getReadEntries()) {
- if (i == endpoints.size()) i = 0;
- LinkedList entries = mappings[i];
- if (entries == null) {
- entries = new LinkedList<>();
- mappings[i] = entries;
- }
- entries.add(e);
- i++;
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Scan<?> getSpecificScan(int minorFragmentId) {
- checkArgument(minorFragmentId < mappings.length, "Mappings length [%s] should be longer than minor fragment id [%s] but it isn't.", mappings.length, minorFragmentId);
- return new JSONScanPOP(mappings[minorFragmentId]);
- }
-
- @Override
- public List<EndpointAffinity> getOperatorAffinity() {
- return Collections.emptyList();
- }
-
- @Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
- return new JSONScanPOP(readEntries);
- }
-
- public static class ScanEntry implements ReadEntry {
- private final String url;
- private Size size;
-
- @JsonCreator
- public ScanEntry(@JsonProperty("url") String url) {
- this.url = url;
- long fileLength = new File(URI.create(url)).length();
- size = new Size(fileLength / ESTIMATED_RECORD_SIZE, ESTIMATED_RECORD_SIZE);
- }
-
- @Override
- public OperatorCost getCost() {
- return new OperatorCost(1, 1, 2, 2);
- }
-
- @Override
- public Size getSize() {
- return size;
- }
-
- public String getUrl() {
- return url;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java
deleted file mode 100644
index a28c7d8..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.*;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.vector.TypeHelper;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-
-@JsonTypeName("mock-scan")
-public class MockGroupScanPOP extends AbstractGroupScan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class);
-
- private final String url;
- protected final List<MockScanEntry> readEntries;
- private final OperatorCost cost;
- private final Size size;
- private LinkedList<MockScanEntry>[] mappings;
-
- @JsonCreator
- public MockGroupScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
- this.readEntries = readEntries;
- OperatorCost cost = new OperatorCost(0,0,0,0);
- Size size = new Size(0,0);
- for(MockScanEntry r : readEntries){
- cost = cost.add(r.getCost());
- size = size.add(r.getSize());
- }
- this.cost = cost;
- this.size = size;
- this.url = url;
- }
-
- public String getUrl() {
- return url;
- }
-
- @JsonProperty("entries")
- public List<MockScanEntry> getReadEntries() {
- return readEntries;
- }
-
- public static class MockScanEntry implements ReadEntry {
-
- private final int records;
- private final MockColumn[] types;
- private final int recordSize;
-
-
- @JsonCreator
- public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
- this.records = records;
- this.types = types;
- int size = 0;
- for(MockColumn dt : types){
- size += TypeHelper.getSize(dt.getMajorType());
- }
- this.recordSize = size;
- }
-
- @Override
- public OperatorCost getCost() {
- return new OperatorCost(1, 2, 1, 1);
- }
-
- public int getRecords() {
- return records;
- }
-
- public MockColumn[] getTypes() {
- return types;
- }
-
- @Override
- public Size getSize() {
- return new Size(records, recordSize);
- }
- }
-
- @JsonInclude(Include.NON_NULL)
- public static class MockColumn{
- @JsonProperty("type") public MinorType minorType;
- public String name;
- public DataMode mode;
- public Integer width;
- public Integer precision;
- public Integer scale;
-
-
- @JsonCreator
- public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
- this.name = name;
- this.minorType = minorType;
- this.mode = mode;
- this.width = width;
- this.precision = precision;
- this.scale = scale;
- }
-
- @JsonProperty("type")
- public MinorType getMinorType() {
- return minorType;
- }
- public String getName() {
- return name;
- }
- public DataMode getMode() {
- return mode;
- }
- public Integer getWidth() {
- return width;
- }
- public Integer getPrecision() {
- return precision;
- }
- public Integer getScale() {
- return scale;
- }
-
- @JsonIgnore
- public MajorType getMajorType(){
- MajorType.Builder b = MajorType.newBuilder();
- b.setMode(mode);
- b.setMinorType(minorType);
- if(precision != null) b.setPrecision(precision);
- if(width != null) b.setWidth(width);
- if(scale != null) b.setScale(scale);
- return b.build();
- }
-
- }
-
- @Override
- public List<EndpointAffinity> getOperatorAffinity() {
- return Collections.emptyList();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void applyAssignments(List<DrillbitEndpoint> endpoints) {
- Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
-
- mappings = new LinkedList[endpoints.size()];
-
- int i =0;
- for(MockScanEntry e : this.getReadEntries()){
- if(i == endpoints.size()) i -= endpoints.size();
- LinkedList<MockScanEntry> entries = mappings[i];
- if(entries == null){
- entries = new LinkedList<MockScanEntry>();
- mappings[i] = entries;
- }
- entries.add(e);
- i++;
- }
- }
-
- @Override
- public SubScan getSpecificScan(int minorFragmentId) {
- assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
- return new MockSubScanPOP(url, mappings[minorFragmentId]);
- }
-
- @Override
- public int getMaxParallelizationWidth() {
- return readEntries.size();
- }
-
- @Override
- public OperatorCost getCost() {
- return cost;
- }
-
- @Override
- public Size getSize() {
- return size;
- }
-
- @Override
- @JsonIgnore
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
- Preconditions.checkArgument(children.isEmpty());
- return new MockGroupScanPOP(url, readEntries);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
deleted file mode 100644
index bd57823..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockColumn;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.TypeHelper;
-import org.apache.drill.exec.vector.ValueVector;
-
-public class MockRecordReader implements RecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
-
- private OutputMutator output;
- private MockScanEntry config;
- private FragmentContext context;
- private ValueVector[] valueVectors;
- private int recordsRead;
- private int batchRecordCount;
-
- public MockRecordReader(FragmentContext context, MockScanEntry config) {
- this.context = context;
- this.config = config;
- }
-
- private int getEstimatedRecordSize(MockColumn[] types) {
- int x = 0;
- for (int i = 0; i < types.length; i++) {
- x += TypeHelper.getSize(types[i].getMajorType());
- }
- return x;
- }
-
- private ValueVector getVector(String name, MajorType type, int length) {
- assert context != null : "Context shouldn't be null.";
- MaterializedField f = MaterializedField.create(new SchemaPath(name, ExpressionPosition.UNKNOWN), type);
- ValueVector v;
- v = TypeHelper.getNewVector(f, context.getAllocator());
- AllocationHelper.allocate(v, length, 50, 4);
-
- return v;
-
- }
-
- @Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
- try {
- this.output = output;
- int estimateRowSize = getEstimatedRecordSize(config.getTypes());
- valueVectors = new ValueVector[config.getTypes().length];
- batchRecordCount = 250000 / estimateRowSize;
-
- for (int i = 0; i < config.getTypes().length; i++) {
- valueVectors[i] = getVector(config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
- output.addField(valueVectors[i]);
- }
- output.setNewSchema();
- } catch (SchemaChangeException e) {
- throw new ExecutionSetupException("Failure while setting up fields", e);
- }
-
- }
-
- @Override
- public int next() {
-
- int recordSetSize = Math.min(batchRecordCount, this.config.getRecords()- recordsRead);
-
- recordsRead += recordSetSize;
- for(ValueVector v : valueVectors){
- AllocationHelper.allocate(v, recordSetSize, 50, 5);
-
- logger.debug("MockRecordReader: Generating random data for VV of type " + v.getClass().getName());
- ValueVector.Mutator m = v.getMutator();
- m.setValueCount(recordSetSize);
- m.generateTestData();
-
- }
- return recordSetSize;
- }
-
- @Override
- public void cleanup() {
- for (int i = 0; i < valueVectors.length; i++) {
- try {
- output.removeField(valueVectors[i].getField());
- } catch (SchemaChangeException e) {
- logger.warn("Failure while trying to remove field.", e);
- }
- valueVectors[i].close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
deleted file mode 100644
index a06aaee..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
-
- @Override
- public RecordBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
- Preconditions.checkArgument(children.isEmpty());
- List<MockScanEntry> entries = config.getReadEntries();
- List<RecordReader> readers = Lists.newArrayList();
- for(MockScanEntry e : entries){
- readers.add(new MockRecordReader(context, e));
- }
- return new ScanBatch(context, readers.iterator());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
deleted file mode 100644
index 639d0d2..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.base.AbstractStore;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Store;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("mock-store")
-public class MockStorePOP extends AbstractStore {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorePOP.class);
-
- @JsonCreator
- public MockStorePOP(@JsonProperty("child") PhysicalOperator child) {
- super(child);
- }
-
- public int getMaxWidth() {
- return 1;
- }
-
- @Override
- public List<EndpointAffinity> getOperatorAffinity() {
- return Collections.emptyList();
- }
-
- @Override
- public void applyAssignments(List<DrillbitEndpoint> endpoints) {
-
- }
-
- @Override
- public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
- return new MockStorePOP(child);
- }
-
- @Override
- public OperatorCost getCost() {
- return new OperatorCost(1,getSize().getRecordCount()*getSize().getRecordSize(),1,1);
- }
-
- @Override
- protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new MockStorePOP(child);
- }
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java
deleted file mode 100644
index 7380617..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import com.google.common.collect.Iterators;
-import org.apache.drill.common.graph.GraphVisitor;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.*;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.vector.TypeHelper;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-
-@JsonTypeName("mock-sub-scan")
-public class MockSubScanPOP extends AbstractBase implements SubScan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class);
-
- private final String url;
- protected final List<MockGroupScanPOP.MockScanEntry> readEntries;
- private final OperatorCost cost;
- private final Size size;
- private LinkedList<MockGroupScanPOP.MockScanEntry>[] mappings;
-
- @JsonCreator
- public MockSubScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockGroupScanPOP.MockScanEntry> readEntries) {
- this.readEntries = readEntries;
- OperatorCost cost = new OperatorCost(0,0,0,0);
- Size size = new Size(0,0);
- for(MockGroupScanPOP.MockScanEntry r : readEntries){
- cost = cost.add(r.getCost());
- size = size.add(r.getSize());
- }
- this.cost = cost;
- this.size = size;
- this.url = url;
- }
-
- public String getUrl() {
- return url;
- }
-
- @JsonProperty("entries")
- public List<MockGroupScanPOP.MockScanEntry> getReadEntries() {
- return readEntries;
- }
-
- @Override
- public Iterator<PhysicalOperator> iterator() {
- return Iterators.emptyIterator();
- }
-
- @Override
- public OperatorCost getCost() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Size getSize() {
- throw new UnsupportedOperationException();
- }
-
- // will want to replace these two methods with an interface above for AbstractSubScan
- @Override
- public boolean isExecutable() {
- return true; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
- return physicalVisitor.visitSubScan(this, value);
- }
- // see comment above about replacing this
-
- @Override
- @JsonIgnore
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
- Preconditions.checkArgument(children.isEmpty());
- return new MockSubScanPOP(url, readEntries);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 61c9383..0a329d6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -25,30 +25,31 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.physical.config.Filter;
-import org.apache.drill.exec.physical.config.MockScanBatchCreator;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.RandomReceiver;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.physical.base.*;
-import org.apache.drill.exec.physical.config.*;
import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.json.JSONScanBatchCreator;
+import org.apache.drill.exec.store.json.JSONSubScan;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP;
+import org.apache.drill.exec.store.mock.MockScanBatchCreator;
+import org.apache.drill.exec.store.mock.MockSubScanPOP;
+import org.apache.drill.exec.store.parquet.ParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
-import org.apache.drill.exec.store.parquet.ParquetRowGroupScan;
-import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
-public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException>{
+public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
private MockScanBatchCreator msc = new MockScanBatchCreator();
@@ -62,9 +63,10 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
private SortBatchCreator sbc = new SortBatchCreator();
private RootExec root = null;
- private ImplCreator(){}
+ private ImplCreator() {
+ }
- public RootExec getRoot(){
+ public RootExec getRoot() {
return root;
}
@@ -78,20 +80,13 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
Preconditions.checkNotNull(subScan);
Preconditions.checkNotNull(context);
- if(subScan instanceof MockSubScanPOP){
+ if (subScan instanceof MockSubScanPOP) {
return msc.getBatch(context, (MockSubScanPOP) subScan, Collections.<RecordBatch> emptyList());
-
- if(scan instanceof MockScanPOP){
- return msc.getBatch(context, (MockScanPOP) scan, Collections.<RecordBatch>emptyList());
- } else if(scan instanceof JSONScanPOP) {
- return new JSONScanBatchCreator().getBatch(context, (JSONScanPOP)scan, Collections.<RecordBatch>emptyList());
- }else{
- return super.visitScan(scan, context);
- }
- else if (subScan instanceof ParquetRowGroupScan){
- return parquetScan.getBatch(context, (ParquetRowGroupScan) subScan, Collections.<RecordBatch> emptyList());
- }
- else{
+ } else if (subScan instanceof JSONSubScan) {
+ return new JSONScanBatchCreator().getBatch(context, (JSONSubScan) subScan, Collections.<RecordBatch> emptyList());
+ } else if (subScan instanceof ParquetRowGroupScan) {
+ return parquetScan.getBatch(context, (ParquetRowGroupScan) subScan, Collections.<RecordBatch> emptyList());
+ } else {
return super.visitSubScan(subScan, context);
}
@@ -99,14 +94,13 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
@Override
public RecordBatch visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
- if(op instanceof SelectionVectorRemover){
+ if (op instanceof SelectionVectorRemover) {
return svc.getBatch(context, (SelectionVectorRemover) op, getChildren(op, context));
- }else{
+ } else {
return super.visitOp(op, context);
}
}
-
@Override
public RecordBatch visitSort(Sort sort, FragmentContext context) throws ExecutionSetupException {
return sbc.getBatch(context, sort, getChildren(sort, context));
@@ -135,18 +129,20 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
return rrc.getBatch(context, op, null);
}
- private List<RecordBatch> getChildren(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException{
+ private List<RecordBatch> getChildren(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
List<RecordBatch> children = Lists.newArrayList();
- for(PhysicalOperator child : op){
+ for (PhysicalOperator child : op) {
children.add(child.accept(this, context));
}
return children;
}
- public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException{
+ public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
ImplCreator i = new ImplCreator();
root.accept(i, context);
- if(i.root == null) throw new ExecutionSetupException("The provided fragment did not have a root node that correctly created a RootExec value.");
+ if (i.root == null)
+ throw new ExecutionSetupException(
+ "The provided fragment did not have a root node that correctly created a RootExec value.");
return i.getRoot();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
deleted file mode 100644
index 8a2de63..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ /dev/null
@@ -1,486 +0,0 @@
-package org.apache.drill.exec.store;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-import com.google.common.io.InputSupplier;
-import com.google.common.io.Resources;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.schema.*;
-import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
-import org.apache.drill.exec.vector.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
-import static com.fasterxml.jackson.core.JsonToken.*;
-
-public class JSONRecordReader implements RecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
- private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
- public static final Charset UTF_8 = Charset.forName("UTF-8");
-
- private final String inputPath;
-
- private final Map<String, VectorHolder> valueVectorMap;
-
- private JsonParser parser;
- private SchemaIdGenerator generator;
- private DiffSchema diffSchema;
- private RecordSchema currentSchema;
- private List<Field> removedFields;
- private OutputMutator outputMutator;
- private BufferAllocator allocator;
- private int batchSize;
-
- public JSONRecordReader(FragmentContext fragmentContext, String inputPath, int batchSize) {
- this.inputPath = inputPath;
- this.allocator = fragmentContext.getAllocator();
- this.batchSize = batchSize;
- valueVectorMap = Maps.newHashMap();
- }
-
- public JSONRecordReader(FragmentContext fragmentContext, String inputPath) {
- this(fragmentContext, inputPath, DEFAULT_LENGTH);
- }
-
- private JsonParser getParser() {
- return parser;
- }
-
- @Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
- outputMutator = output;
- currentSchema = new ObjectSchema();
- diffSchema = new DiffSchema();
- removedFields = Lists.newArrayList();
-
- try {
- InputSupplier<InputStreamReader> input;
- if (inputPath.startsWith("resource:")) {
- input = Resources.newReaderSupplier(Resources.getResource(inputPath.substring(9)), Charsets.UTF_8);
- } else {
- input = Files.newReaderSupplier(new File(URI.create(inputPath)), Charsets.UTF_8);
- }
-
- JsonFactory factory = new JsonFactory();
- parser = factory.createJsonParser(input.getInput());
- parser.nextToken(); // Read to the first START_OBJECT token
- generator = new SchemaIdGenerator();
- } catch (IOException e) {
- throw new ExecutionSetupException(e);
- }
- }
-
- @Override
- public int next() {
- if (parser.isClosed() || !parser.hasCurrentToken()) {
- return 0;
- }
-
- resetBatch();
-
- int nextRowIndex = 0;
-
- try {
- while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) {
- parser.nextToken(); // Read to START_OBJECT token
-
- if (!parser.hasCurrentToken()) {
- parser.close();
- break;
- }
- }
-
- parser.nextToken();
-
- if (!parser.hasCurrentToken()) {
- parser.close();
- }
-
- // Garbage collect fields never referenced in this batch
- for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
- diffSchema.addRemovedField(field);
- outputMutator.removeField(field.getAsMaterializedField());
- }
-
- if (diffSchema.isChanged()) {
- outputMutator.setNewSchema();
- }
-
-
- } catch (IOException | SchemaChangeException e) {
- logger.error("Error reading next in Json reader", e);
- }
-
- for (VectorHolder holder : valueVectorMap.values()) {
- holder.populateVectorLength();
- }
-
- return nextRowIndex;
- }
-
- private void resetBatch() {
- for (VectorHolder value : valueVectorMap.values()) {
- value.reset();
- }
-
- currentSchema.resetMarkedFields();
- diffSchema.reset();
- removedFields.clear();
- }
-
- @Override
- public void cleanup() {
- try {
- parser.close();
- } catch (IOException e) {
- logger.warn("Error closing Json parser", e);
- }
- }
-
-
- private RecordSchema getCurrentSchema() {
- return currentSchema;
- }
-
- private void setCurrentSchema(RecordSchema schema) {
- currentSchema = schema;
- }
-
- private List<Field> getRemovedFields() {
- return removedFields;
- }
-
- public BufferAllocator getAllocator() {
- return allocator;
- }
-
- public static enum ReadType {
- ARRAY(END_ARRAY) {
- @Override
- public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
- return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
- }
-
- @Override
- public RecordSchema createSchema() throws IOException {
- return new ObjectSchema();
- }
- },
- OBJECT(END_OBJECT) {
- @Override
- public Field createField(RecordSchema parentSchema,
- String prefixFieldName,
- String fieldName,
- MajorType fieldType,
- int index) {
- return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
- }
-
- @Override
- public RecordSchema createSchema() throws IOException {
- return new ObjectSchema();
- }
- };
-
- private final JsonToken endObject;
-
- ReadType(JsonToken endObject) {
- this.endObject = endObject;
- }
-
- public JsonToken getEndObject() {
- return endObject;
- }
-
- @SuppressWarnings("ConstantConditions")
- public boolean readRecord(JSONRecordReader reader,
- String prefixFieldName,
- int rowIndex,
- int groupCount) throws IOException, SchemaChangeException {
- JsonParser parser = reader.getParser();
- JsonToken token = parser.nextToken();
- JsonToken endObject = getEndObject();
- int colIndex = 0;
- boolean isFull = false;
- while (token != endObject) {
- if (token == FIELD_NAME) {
- token = parser.nextToken();
- continue;
- }
-
- String fieldName = parser.getCurrentName();
- MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY);
- ReadType readType = null;
- switch (token) {
- case START_ARRAY:
- readType = ReadType.ARRAY;
- groupCount++;
- break;
- case START_OBJECT:
- readType = ReadType.OBJECT;
- groupCount = 0;
- break;
- }
-
- if (fieldType != null) { // Including nulls
- boolean currentFieldFull = !recordData(
- readType,
- reader,
- fieldType,
- prefixFieldName,
- fieldName,
- rowIndex,
- colIndex,
- groupCount);
-
- isFull = isFull || currentFieldFull;
- }
- token = parser.nextToken();
- colIndex += 1;
- }
- return !isFull;
- }
-
- private void removeChildFields(List<Field> removedFields, Field field) {
- RecordSchema schema = field.getAssignedSchema();
- if (schema == null) {
- return;
- }
- for (Field childField : schema.getFields()) {
- removedFields.add(childField);
- if (childField.hasSchema()) {
- removeChildFields(removedFields, childField);
- }
- }
- }
-
- private boolean recordData(JSONRecordReader.ReadType readType,
- JSONRecordReader reader,
- MajorType fieldType,
- String prefixFieldName,
- String fieldName,
- int rowIndex,
- int colIndex,
- int groupCount) throws IOException, SchemaChangeException {
- RecordSchema currentSchema = reader.getCurrentSchema();
- Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex);
- boolean isFieldFound = field != null;
- List<Field> removedFields = reader.getRemovedFields();
- boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE);
-
- if (isFieldFound && !field.getFieldType().equals(fieldType)) {
- boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE);
-
- if (newFieldLateBound && !existingFieldLateBound) {
- fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType());
- } else if (!newFieldLateBound && existingFieldLateBound) {
- field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType()));
- } else if (!newFieldLateBound && !existingFieldLateBound) {
- if (field.hasSchema()) {
- removeChildFields(removedFields, field);
- }
- removedFields.add(field);
- currentSchema.removeField(field, colIndex);
-
- isFieldFound = false;
- }
- }
-
- if (!isFieldFound) {
- field = createField(
- currentSchema,
- prefixFieldName,
- fieldName,
- fieldType,
- colIndex
- );
-
- reader.recordNewField(field);
- currentSchema.addField(field);
- }
-
- field.setRead(true);
-
- VectorHolder holder = getOrCreateVectorHolder(reader, field);
- if (readType != null) {
- RecordSchema fieldSchema = field.getAssignedSchema();
- RecordSchema newSchema = readType.createSchema();
-
- if (readType != ReadType.ARRAY) {
- reader.setCurrentSchema(fieldSchema);
- if (fieldSchema == null) reader.setCurrentSchema(newSchema);
- readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
- } else {
- readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
- }
-
- reader.setCurrentSchema(currentSchema);
-
- } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) {
- return addValueToVector(
- rowIndex,
- holder,
- JacksonHelper.getValueFromFieldType(
- reader.getParser(),
- fieldType.getMinorType()
- ),
- fieldType.getMinorType(),
- groupCount
- );
- }
-
- return true;
- }
-
- private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) {
- switch (minorType) {
- case INT: {
- holder.incAndCheckLength(NullableIntHolder.WIDTH * 8 + 1);
- if (groupCount == 0) {
- if (val != null) {
- NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
- NullableIntVector.Mutator m = int4.getMutator();
- m.set(index, (Integer) val);
- }
- } else {
- if (val == null) {
- throw new UnsupportedOperationException("Nullable repeated int is not supported.");
- }
-
- RepeatedIntVector repeatedInt4 = (RepeatedIntVector) holder.getValueVector();
- RepeatedIntVector.Mutator m = repeatedInt4.getMutator();
- holder.setGroupCount(index);
- m.add(index, (Integer) val);
- }
-
- return holder.hasEnoughSpace(NullableIntHolder.WIDTH * 8 + 1);
- }
- case FLOAT4: {
- holder.incAndCheckLength(NullableFloat4Holder.WIDTH * 8 + 1);
- if (groupCount == 0) {
- if (val != null) {
- NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
- NullableFloat4Vector.Mutator m = float4.getMutator();
- m.set(index, (Float) val);
- }
- } else {
- if (val == null) {
- throw new UnsupportedOperationException("Nullable repeated float is not supported.");
- }
-
- RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector();
- RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator();
- holder.setGroupCount(index);
- m.add(index, (Float) val);
- }
- return holder.hasEnoughSpace(NullableFloat4Holder.WIDTH * 8 + 1);
- }
- case VARCHAR: {
- if (val == null) {
- return (index + 1) * 4 <= holder.getLength();
- } else {
- byte[] bytes = ((String) val).getBytes(UTF_8);
- int length = bytes.length;
- holder.incAndCheckLength(length);
- if (groupCount == 0) {
- NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
- NullableVarCharVector.Mutator m = varLen4.getMutator();
- m.set(index, bytes);
- } else {
- RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector();
- RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator();
- holder.setGroupCount(index);
- m.add(index, bytes);
- }
- return holder.hasEnoughSpace(length + 4 + 1);
- }
- }
- case BIT: {
- holder.incAndCheckLength(NullableBitHolder.WIDTH + 1);
- if (groupCount == 0) {
- if (val != null) {
- NullableBitVector bit = (NullableBitVector) holder.getValueVector();
- NullableBitVector.Mutator m = bit.getMutator();
- m.set(index, (Boolean) val ? 1 : 0);
- }
- } else {
- if (val == null) {
- throw new UnsupportedOperationException("Nullable repeated boolean is not supported.");
- }
-
- RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector();
- RepeatedBitVector.Mutator m = repeatedBit.getMutator();
- holder.setGroupCount(index);
- m.add(index, (Boolean) val ? 1 : 0);
- }
- return holder.hasEnoughSpace(NullableBitHolder.WIDTH + 1);
- }
- default:
- throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
- }
- }
-
- private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
- return reader.getOrCreateVectorHolder(field);
- }
-
- public abstract RecordSchema createSchema() throws IOException;
-
- public abstract Field createField(RecordSchema parentSchema,
- String prefixFieldName,
- String fieldName,
- MajorType fieldType,
- int index);
- }
-
- private void recordNewField(Field field) {
- diffSchema.recordNewField(field);
- }
-
- private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
- String fullFieldName = field.getFullFieldName();
- VectorHolder holder = valueVectorMap.get(fullFieldName);
-
- if (holder == null) {
- MajorType type = field.getFieldType();
- MinorType minorType = type.getMinorType();
-
- if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
- return null;
- }
-
- MaterializedField f = MaterializedField.create(new SchemaPath(fullFieldName, ExpressionPosition.UNKNOWN), type);
-
- ValueVector v = TypeHelper.getNewVector(f, allocator);
- AllocationHelper.allocate(v, batchSize, 50);
- holder = new VectorHolder(v);
- valueVectorMap.put(fullFieldName, holder);
- outputMutator.addField(v);
- return holder;
- }
- return holder;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
index 7cbea57..828ae17 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
@@ -27,11 +27,16 @@ public class VectorHolder {
private ValueVector vector;
private int currentLength;
- public VectorHolder(int length, ValueVector vector) {
- this.length = length;
- this.vector = vector;
- this.mutator = vector.getMutator();
- }
+
+ public VectorHolder(int length, ValueVector vector) {
+ this.length = length;
+ this.vector = vector;
+ }
+
+ public VectorHolder(ValueVector vector) {
+ this.length = vector.getValueCapacity();
+ this.vector = vector;
+ }
public ValueVector getValueVector() {
return vector;
@@ -47,9 +52,9 @@ public class VectorHolder {
}
public void setGroupCount(int groupCount) {
- if(this.groupCount < groupCount) {
+ if (this.groupCount < groupCount) {
RepeatedMutator mutator = (RepeatedMutator) vector.getMutator();
- while(this.groupCount < groupCount) {
+ while (this.groupCount < groupCount) {
mutator.startNewGroup(++this.groupCount);
}
}
@@ -71,7 +76,7 @@ public class VectorHolder {
public void populateVectorLength() {
ValueVector.Mutator mutator = vector.getMutator();
- if(vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) {
+ if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) {
mutator.setValueCount(groupCount);
} else {
mutator.setValueCount(count);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
new file mode 100644
index 0000000..ff5f474
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
@@ -0,0 +1,145 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.store.json;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("json-scan")
+public class JSONGroupScan extends AbstractGroupScan {
+ private static int ESTIMATED_RECORD_SIZE = 1024; // 1kb
+
+ private LinkedList<JSONGroupScan.ScanEntry>[] mappings;
+ protected final List<JSONGroupScan.ScanEntry> readEntries;
+ private final OperatorCost cost;
+ private final Size size;
+
+ @JsonCreator
+ public JSONGroupScan(@JsonProperty("entries") List<JSONGroupScan.ScanEntry> readEntries) {
+ this.readEntries = readEntries;
+ OperatorCost cost = new OperatorCost(0,0,0,0);
+ Size size = new Size(0,0);
+ for(JSONGroupScan.ScanEntry r : readEntries){
+ cost = cost.add(r.getCost());
+ size = size.add(r.getSize());
+ }
+ this.cost = cost;
+ this.size = size;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
+ checkArgument(endpoints.size() <= readEntries.size());
+
+ mappings = new LinkedList[endpoints.size()];
+
+ int i = 0;
+ for (ScanEntry e : readEntries) {
+ if (i == endpoints.size()) i = 0;
+ LinkedList entries = mappings[i];
+ if (entries == null) {
+ entries = new LinkedList<>();
+ mappings[i] = entries;
+ }
+ entries.add(e);
+ i++;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) {
+ checkArgument(minorFragmentId < mappings.length, "Mappings length [%s] should be longer than minor fragment id [%s] but it isn't.", mappings.length, minorFragmentId);
+ return new JSONSubScan(mappings[minorFragmentId]);
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ return Collections.emptyList();
+ }
+
+ public List<JSONGroupScan.ScanEntry> getReadEntries() {
+ return readEntries;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ return new JSONGroupScan(readEntries);
+ }
+
+ public static class ScanEntry implements ReadEntry {
+ private final String url;
+ private Size size;
+
+ @JsonCreator
+ public ScanEntry(@JsonProperty("url") String url) {
+ this.url = url;
+ long fileLength = new File(URI.create(url)).length();
+ size = new Size(fileLength / ESTIMATED_RECORD_SIZE, ESTIMATED_RECORD_SIZE);
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1, 1, 2, 2);
+ }
+
+ @Override
+ public Size getSize() {
+ return size;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return readEntries.size();
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return cost;
+ }
+
+ @Override
+ public Size getSize() {
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
new file mode 100644
index 0000000..eee0fb6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
@@ -0,0 +1,489 @@
+package org.apache.drill.exec.store.json;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.common.io.InputSupplier;
+import com.google.common.io.Resources;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.schema.*;
+import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.VectorHolder;
+import org.apache.drill.exec.vector.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+import static com.fasterxml.jackson.core.JsonToken.*;
+
+public class JSONRecordReader implements RecordReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+ private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
+ public static final Charset UTF_8 = Charset.forName("UTF-8");
+
+ private final String inputPath;
+
+ private final Map<String, VectorHolder> valueVectorMap;
+
+ private JsonParser parser;
+ private SchemaIdGenerator generator;
+ private DiffSchema diffSchema;
+ private RecordSchema currentSchema;
+ private List<Field> removedFields;
+ private OutputMutator outputMutator;
+ private BufferAllocator allocator;
+ private int batchSize;
+
+ public JSONRecordReader(FragmentContext fragmentContext, String inputPath, int batchSize) {
+ this.inputPath = inputPath;
+ this.allocator = fragmentContext.getAllocator();
+ this.batchSize = batchSize;
+ valueVectorMap = Maps.newHashMap();
+ }
+
+ public JSONRecordReader(FragmentContext fragmentContext, String inputPath) {
+ this(fragmentContext, inputPath, DEFAULT_LENGTH);
+ }
+
+ private JsonParser getParser() {
+ return parser;
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+ outputMutator = output;
+ currentSchema = new ObjectSchema();
+ diffSchema = new DiffSchema();
+ removedFields = Lists.newArrayList();
+
+ try {
+ InputSupplier<InputStreamReader> input;
+ if (inputPath.startsWith("resource:")) {
+ input = Resources.newReaderSupplier(Resources.getResource(inputPath.substring(9)), Charsets.UTF_8);
+ } else {
+ input = Files.newReaderSupplier(new File(URI.create(inputPath)), Charsets.UTF_8);
+ }
+
+ JsonFactory factory = new JsonFactory();
+ parser = factory.createJsonParser(input.getInput());
+ parser.nextToken(); // Read to the first START_OBJECT token
+ generator = new SchemaIdGenerator();
+ } catch (IOException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public int next() {
+ if (parser.isClosed() || !parser.hasCurrentToken()) {
+ return 0;
+ }
+
+ resetBatch();
+
+ int nextRowIndex = 0;
+
+ try {
+ while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) {
+ parser.nextToken(); // Read to START_OBJECT token
+
+ if (!parser.hasCurrentToken()) {
+ parser.close();
+ break;
+ }
+ }
+
+ parser.nextToken();
+
+ if (!parser.hasCurrentToken()) {
+ parser.close();
+ }
+
+ // Garbage collect fields never referenced in this batch
+ for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
+ diffSchema.addRemovedField(field);
+ outputMutator.removeField(field.getAsMaterializedField());
+ }
+
+ if (diffSchema.isChanged()) {
+ outputMutator.setNewSchema();
+ }
+
+
+ } catch (IOException | SchemaChangeException e) {
+ logger.error("Error reading next in Json reader", e);
+ }
+
+ for (VectorHolder holder : valueVectorMap.values()) {
+ holder.populateVectorLength();
+ }
+
+ return nextRowIndex;
+ }
+
+ private void resetBatch() {
+ for (VectorHolder value : valueVectorMap.values()) {
+ value.reset();
+ }
+
+ currentSchema.resetMarkedFields();
+ diffSchema.reset();
+ removedFields.clear();
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ parser.close();
+ } catch (IOException e) {
+ logger.warn("Error closing Json parser", e);
+ }
+ }
+
+
+ private RecordSchema getCurrentSchema() {
+ return currentSchema;
+ }
+
+ private void setCurrentSchema(RecordSchema schema) {
+ currentSchema = schema;
+ }
+
+ private List<Field> getRemovedFields() {
+ return removedFields;
+ }
+
+ public BufferAllocator getAllocator() {
+ return allocator;
+ }
+
+ public static enum ReadType {
+ ARRAY(END_ARRAY) {
+ @Override
+ public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
+ return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
+ }
+
+ @Override
+ public RecordSchema createSchema() throws IOException {
+ return new ObjectSchema();
+ }
+ },
+ OBJECT(END_OBJECT) {
+ @Override
+ public Field createField(RecordSchema parentSchema,
+ String prefixFieldName,
+ String fieldName,
+ MajorType fieldType,
+ int index) {
+ return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
+ }
+
+ @Override
+ public RecordSchema createSchema() throws IOException {
+ return new ObjectSchema();
+ }
+ };
+
+ private final JsonToken endObject;
+
+ ReadType(JsonToken endObject) {
+ this.endObject = endObject;
+ }
+
+ public JsonToken getEndObject() {
+ return endObject;
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ public boolean readRecord(JSONRecordReader reader,
+ String prefixFieldName,
+ int rowIndex,
+ int groupCount) throws IOException, SchemaChangeException {
+ JsonParser parser = reader.getParser();
+ JsonToken token = parser.nextToken();
+ JsonToken endObject = getEndObject();
+ int colIndex = 0;
+ boolean isFull = false;
+ while (token != endObject) {
+ if (token == FIELD_NAME) {
+ token = parser.nextToken();
+ continue;
+ }
+
+ String fieldName = parser.getCurrentName();
+ MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY);
+ ReadType readType = null;
+ switch (token) {
+ case START_ARRAY:
+ readType = ReadType.ARRAY;
+ groupCount++;
+ break;
+ case START_OBJECT:
+ readType = ReadType.OBJECT;
+ groupCount = 0;
+ break;
+ }
+
+ if (fieldType != null) { // Including nulls
+ boolean currentFieldFull = !recordData(
+ readType,
+ reader,
+ fieldType,
+ prefixFieldName,
+ fieldName,
+ rowIndex,
+ colIndex,
+ groupCount);
+
+ isFull = isFull || currentFieldFull;
+ }
+ token = parser.nextToken();
+ colIndex += 1;
+ }
+ return !isFull;
+ }
+
+ private void removeChildFields(List<Field> removedFields, Field field) {
+ RecordSchema schema = field.getAssignedSchema();
+ if (schema == null) {
+ return;
+ }
+ for (Field childField : schema.getFields()) {
+ removedFields.add(childField);
+ if (childField.hasSchema()) {
+ removeChildFields(removedFields, childField);
+ }
+ }
+ }
+
+ private boolean recordData(JSONRecordReader.ReadType readType,
+ JSONRecordReader reader,
+ MajorType fieldType,
+ String prefixFieldName,
+ String fieldName,
+ int rowIndex,
+ int colIndex,
+ int groupCount) throws IOException, SchemaChangeException {
+ RecordSchema currentSchema = reader.getCurrentSchema();
+ Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex);
+ boolean isFieldFound = field != null;
+ List<Field> removedFields = reader.getRemovedFields();
+ boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE);
+
+ if (isFieldFound && !field.getFieldType().equals(fieldType)) {
+ boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE);
+
+ if (newFieldLateBound && !existingFieldLateBound) {
+ fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType());
+ } else if (!newFieldLateBound && existingFieldLateBound) {
+ field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType()));
+ } else if (!newFieldLateBound && !existingFieldLateBound) {
+ if (field.hasSchema()) {
+ removeChildFields(removedFields, field);
+ }
+ removedFields.add(field);
+ currentSchema.removeField(field, colIndex);
+
+ isFieldFound = false;
+ }
+ }
+
+ if (!isFieldFound) {
+ field = createField(
+ currentSchema,
+ prefixFieldName,
+ fieldName,
+ fieldType,
+ colIndex
+ );
+
+ reader.recordNewField(field);
+ currentSchema.addField(field);
+ }
+
+ field.setRead(true);
+
+ VectorHolder holder = getOrCreateVectorHolder(reader, field);
+ if (readType != null) {
+ RecordSchema fieldSchema = field.getAssignedSchema();
+ RecordSchema newSchema = readType.createSchema();
+
+ if (readType != ReadType.ARRAY) {
+ reader.setCurrentSchema(fieldSchema);
+ if (fieldSchema == null) reader.setCurrentSchema(newSchema);
+ readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
+ } else {
+ readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
+ }
+
+ reader.setCurrentSchema(currentSchema);
+
+ } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) {
+ return addValueToVector(
+ rowIndex,
+ holder,
+ JacksonHelper.getValueFromFieldType(
+ reader.getParser(),
+ fieldType.getMinorType()
+ ),
+ fieldType.getMinorType(),
+ groupCount
+ );
+ }
+
+ return true;
+ }
+
+ private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) {
+ switch (minorType) {
+ case INT: {
+ holder.incAndCheckLength(NullableIntHolder.WIDTH * 8 + 1);
+ if (groupCount == 0) {
+ if (val != null) {
+ NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
+ NullableIntVector.Mutator m = int4.getMutator();
+ m.set(index, (Integer) val);
+ }
+ } else {
+ if (val == null) {
+ throw new UnsupportedOperationException("Nullable repeated int is not supported.");
+ }
+
+ RepeatedIntVector repeatedInt4 = (RepeatedIntVector) holder.getValueVector();
+ RepeatedIntVector.Mutator m = repeatedInt4.getMutator();
+ holder.setGroupCount(index);
+ m.add(index, (Integer) val);
+ }
+
+ return holder.hasEnoughSpace(NullableIntHolder.WIDTH * 8 + 1);
+ }
+ case FLOAT4: {
+ holder.incAndCheckLength(NullableFloat4Holder.WIDTH * 8 + 1);
+ if (groupCount == 0) {
+ if (val != null) {
+ NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
+ NullableFloat4Vector.Mutator m = float4.getMutator();
+ m.set(index, (Float) val);
+ }
+ } else {
+ if (val == null) {
+ throw new UnsupportedOperationException("Nullable repeated float is not supported.");
+ }
+
+ RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector();
+ RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator();
+ holder.setGroupCount(index);
+ m.add(index, (Float) val);
+ }
+ return holder.hasEnoughSpace(NullableFloat4Holder.WIDTH * 8 + 1);
+ }
+ case VARCHAR: {
+ if (val == null) {
+ return (index + 1) * 4 <= holder.getLength();
+ } else {
+ byte[] bytes = ((String) val).getBytes(UTF_8);
+ int length = bytes.length;
+ holder.incAndCheckLength(length);
+ if (groupCount == 0) {
+ NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
+ NullableVarCharVector.Mutator m = varLen4.getMutator();
+ m.set(index, bytes);
+ } else {
+ RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector();
+ RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator();
+ holder.setGroupCount(index);
+ m.add(index, bytes);
+ }
+ return holder.hasEnoughSpace(length + 4 + 1);
+ }
+ }
+ case BIT: {
+ holder.incAndCheckLength(NullableBitHolder.WIDTH + 1);
+ if (groupCount == 0) {
+ if (val != null) {
+ NullableBitVector bit = (NullableBitVector) holder.getValueVector();
+ NullableBitVector.Mutator m = bit.getMutator();
+ m.set(index, (Boolean) val ? 1 : 0);
+ }
+ } else {
+ if (val == null) {
+ throw new UnsupportedOperationException("Nullable repeated boolean is not supported.");
+ }
+
+ RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector();
+ RepeatedBitVector.Mutator m = repeatedBit.getMutator();
+ holder.setGroupCount(index);
+ m.add(index, (Boolean) val ? 1 : 0);
+ }
+ return holder.hasEnoughSpace(NullableBitHolder.WIDTH + 1);
+ }
+ default:
+ throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
+ }
+ }
+
+ private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
+ return reader.getOrCreateVectorHolder(field);
+ }
+
+ public abstract RecordSchema createSchema() throws IOException;
+
+ public abstract Field createField(RecordSchema parentSchema,
+ String prefixFieldName,
+ String fieldName,
+ MajorType fieldType,
+ int index);
+ }
+
+ private void recordNewField(Field field) {
+ diffSchema.recordNewField(field);
+ }
+
+ private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
+ String fullFieldName = field.getFullFieldName();
+ VectorHolder holder = valueVectorMap.get(fullFieldName);
+
+ if (holder == null) {
+ MajorType type = field.getFieldType();
+ MinorType minorType = type.getMinorType();
+
+ if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
+ return null;
+ }
+
+ MaterializedField f = MaterializedField.create(new SchemaPath(fullFieldName, ExpressionPosition.UNKNOWN), type);
+
+ ValueVector v = TypeHelper.getNewVector(f, allocator);
+ AllocationHelper.allocate(v, batchSize, 50);
+ holder = new VectorHolder(v);
+ valueVectorMap.put(fullFieldName, holder);
+ outputMutator.addField(v);
+ return holder;
+ }
+ return holder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
new file mode 100644
index 0000000..eda6b75
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.store.json;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import java.util.List;
+
+public class JSONScanBatchCreator implements BatchCreator<JSONSubScan> {
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, JSONSubScan config, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ List<JSONGroupScan.ScanEntry> entries = config.getReadEntries();
+ List<RecordReader> readers = Lists.newArrayList();
+ for (JSONGroupScan.ScanEntry e : entries) {
+ readers.add(new JSONRecordReader(context, e.getUrl()));
+ }
+
+ return new ScanBatch(context, readers.iterator());
+ }
+}