You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/12 04:48:29 UTC
[07/10] Add support for RepeatedMapVector,
MapVector and RepeatedListVector.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 396834c..4ff3708 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -18,12 +18,14 @@
package org.apache.drill.exec.record;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
import java.util.List;
-import io.netty.buffer.CompositeByteBuf;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import javax.jdo.metadata.FieldMetadata;
+
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.ValueVector;
@@ -63,7 +65,7 @@ public class WritableBatch {
Preconditions.checkState(!cleared,
"Attempted to reconstruct a container from a WritableBatch after it had been cleared");
if (buffers.length > 0) { /* If we have ByteBuf's associated with value vectors */
-
+
CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, buffers.length);
/* Copy data from each buffer into the compound buffer */
@@ -71,8 +73,7 @@ public class WritableBatch {
cbb.addComponent(buf);
}
-
- List<FieldMetadata> fields = def.getFieldList();
+ List<SerializedField> fields = def.getFieldList();
int bufferOffset = 0;
@@ -82,7 +83,7 @@ public class WritableBatch {
int vectorIndex = 0;
for (VectorWrapper<?> vv : container) {
- FieldMetadata fmd = fields.get(vectorIndex);
+ SerializedField fmd = fields.get(vectorIndex);
ValueVector v = vv.getValueVector();
ByteBuf bb = cbb.slice(bufferOffset, fmd.getBufferLength());
// v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
@@ -127,7 +128,7 @@ public class WritableBatch {
public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
List<ByteBuf> buffers = Lists.newArrayList();
- List<FieldMetadata> metadata = Lists.newArrayList();
+ List<SerializedField> metadata = Lists.newArrayList();
for (ValueVector vv : vectors) {
metadata.add(vv.getMetadata());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 872052c..04a9768 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -44,15 +44,15 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
this(name, context, fs, storageConfig, new JSONFormatConfig());
}
-
+
public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) {
super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("json"), "json");
}
-
+
@Override
- public RecordReader getRecordReader(FragmentContext context, FileWork fileWork,
+ public RecordReader getRecordReader(FragmentContext context, FileWork fileWork,
List<SchemaPath> columns) throws ExecutionSetupException {
- return new JSONRecordReader(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns);
+ return new JSONRecordReader2(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns);
}
@Override
@@ -78,6 +78,6 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
return true;
return false;
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
deleted file mode 100644
index 1c8539c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ /dev/null
@@ -1,532 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.json;
-
-import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
-import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
-import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.expr.holders.NullableBitHolder;
-import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
-import org.apache.drill.exec.expr.holders.NullableIntHolder;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.schema.DiffSchema;
-import org.apache.drill.exec.schema.Field;
-import org.apache.drill.exec.schema.NamedField;
-import org.apache.drill.exec.schema.ObjectSchema;
-import org.apache.drill.exec.schema.RecordSchema;
-import org.apache.drill.exec.schema.SchemaIdGenerator;
-import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.VectorHolder;
-import org.apache.drill.exec.vector.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class JSONRecordReader implements RecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
- private static final int DEFAULT_LENGTH = 4000;
- public static final Charset UTF_8 = Charset.forName("UTF-8");
-
- private final Map<String, VectorHolder> valueVectorMap;
- private final FileSystem fileSystem;
- private final Path hadoopPath;
-
- private JsonParser parser;
- private SchemaIdGenerator generator;
- private DiffSchema diffSchema;
- private RecordSchema currentSchema;
- private List<Field> removedFields;
- private OutputMutator outputMutator;
- private int batchSize;
- private final List<SchemaPath> columns;
-
- public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, int batchSize,
- List<SchemaPath> columns) throws OutOfMemoryException {
- this.hadoopPath = new Path(inputPath);
- this.fileSystem = fileSystem;
- this.batchSize = batchSize;
- valueVectorMap = Maps.newHashMap();
- this.columns = columns;
- }
-
- public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
- List<SchemaPath> columns) throws OutOfMemoryException {
- this(fragmentContext, inputPath, fileSystem, DEFAULT_LENGTH, columns);
- }
-
- private JsonParser getParser() {
- return parser;
- }
-
- @Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
- outputMutator = output;
- output.removeAllFields();
- currentSchema = new ObjectSchema();
- diffSchema = new DiffSchema();
- removedFields = Lists.newArrayList();
-
- try {
- JsonFactory factory = new JsonFactory();
- parser = factory.createJsonParser(fileSystem.open(hadoopPath));
- parser.nextToken(); // Read to the first START_OBJECT token
- generator = new SchemaIdGenerator();
- } catch (IOException e) {
- throw new ExecutionSetupException(e);
- }
- }
-
- @Override
- public int next() {
- if (parser.isClosed() || !parser.hasCurrentToken()) {
- return 0;
- }
-
- resetBatch();
-
- int nextRowIndex = 0;
-
- try {
- while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) {
- parser.nextToken(); // Read to START_OBJECT token
-
- if (!parser.hasCurrentToken()) {
- parser.close();
- break;
- }
- }
-
- parser.nextToken();
-
- if (!parser.hasCurrentToken()) {
- parser.close();
- }
-
- // Garbage collect fields never referenced in this batch
- for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
- diffSchema.addRemovedField(field);
- outputMutator.removeField(field.getAsMaterializedField());
- }
-
- if (diffSchema.isChanged()) {
- outputMutator.setNewSchema();
- }
-
-
- } catch (IOException | SchemaChangeException e) {
- logger.error("Error reading next in Json reader", e);
- throw new DrillRuntimeException(e);
- }
-
- for (VectorHolder holder : valueVectorMap.values()) {
- if (holder.isRepeated()) {
- holder.setGroupCount(nextRowIndex);
- }
- holder.getValueVector().getMutator().setValueCount(nextRowIndex);
- }
-
- return nextRowIndex;
- }
-
- private void resetBatch() {
- for (VectorHolder value : valueVectorMap.values()) {
- value.reset();
- }
-
- currentSchema.resetMarkedFields();
- diffSchema.reset();
- removedFields.clear();
- }
-
- @Override
- public void cleanup() {
- try {
- parser.close();
- } catch (IOException e) {
- logger.warn("Error closing Json parser", e);
- }
- }
-
-
- private RecordSchema getCurrentSchema() {
- return currentSchema;
- }
-
- private void setCurrentSchema(RecordSchema schema) {
- currentSchema = schema;
- }
-
- private List<Field> getRemovedFields() {
- return removedFields;
- }
-
- private boolean fieldSelected(String field){
-
- SchemaPath sp = SchemaPath.getCompoundPath(field.split("\\."));
- if (this.columns != null && this.columns.size() > 0){
- for (SchemaPath expr : this.columns){
- if ( sp.equals(expr)){
- return true;
- }
- }
- return false;
- }
- return true;
- }
-
- public static enum ReadType {
- ARRAY(END_ARRAY) {
- @Override
- public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
- return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
- }
-
- @Override
- public RecordSchema createSchema() throws IOException {
- return new ObjectSchema();
- }
- },
- OBJECT(END_OBJECT) {
- @Override
- public Field createField(RecordSchema parentSchema,
- String prefixFieldName,
- String fieldName,
- MajorType fieldType,
- int index) {
- return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
- }
-
- @Override
- public RecordSchema createSchema() throws IOException {
- return new ObjectSchema();
- }
- };
-
- private final JsonToken endObject;
-
- ReadType(JsonToken endObject) {
- this.endObject = endObject;
- }
-
- public JsonToken getEndObject() {
- return endObject;
- }
-
- @SuppressWarnings("ConstantConditions")
- public boolean readRecord(JSONRecordReader reader,
- String prefixFieldName,
- int rowIndex,
- int groupCount) throws IOException, SchemaChangeException {
- JsonParser parser = reader.getParser();
- JsonToken token = parser.nextToken();
- JsonToken endObject = getEndObject();
- int colIndex = 0;
- boolean isFull = false;
- while (token != endObject) {
- if (token == FIELD_NAME) {
- token = parser.nextToken();
- continue;
- }
-
- String fieldName = parser.getCurrentName();
- if ( fieldName != null && ! reader.fieldSelected(fieldName)){
- // this field was not requested in the query
- token = parser.nextToken();
- colIndex += 1;
- continue;
- }
- MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY);
- ReadType readType = null;
- switch (token) {
- case START_ARRAY:
- readType = ReadType.ARRAY;
- groupCount++;
- break;
- case START_OBJECT:
- readType = ReadType.OBJECT;
- groupCount = 0;
- break;
- }
-
- if (fieldType != null) { // Including nulls
- boolean currentFieldFull = !recordData(
- readType,
- reader,
- fieldType,
- prefixFieldName,
- fieldName,
- rowIndex,
- colIndex,
- groupCount);
- if(readType == ReadType.ARRAY) {
- groupCount--;
- }
- isFull = isFull || currentFieldFull;
- }
- token = parser.nextToken();
- colIndex += 1;
- }
- return !isFull;
- }
-
- private void removeChildFields(List<Field> removedFields, Field field) {
- RecordSchema schema = field.getAssignedSchema();
- if (schema == null) {
- return;
- }
- for (Field childField : schema.getFields()) {
- removedFields.add(childField);
- if (childField.hasSchema()) {
- removeChildFields(removedFields, childField);
- }
- }
- }
-
- private boolean recordData(JSONRecordReader.ReadType readType,
- JSONRecordReader reader,
- MajorType fieldType,
- String prefixFieldName,
- String fieldName,
- int rowIndex,
- int colIndex,
- int groupCount) throws IOException, SchemaChangeException {
- RecordSchema currentSchema = reader.getCurrentSchema();
- Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex);
- boolean isFieldFound = field != null;
- List<Field> removedFields = reader.getRemovedFields();
- boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE);
-
- if (isFieldFound && !field.getFieldType().equals(fieldType)) {
- boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE);
-
- if (newFieldLateBound && !existingFieldLateBound) {
- fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType());
- } else if (!newFieldLateBound && existingFieldLateBound) {
- field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType()));
- } else if (!newFieldLateBound && !existingFieldLateBound) {
- if (field.hasSchema()) {
- removeChildFields(removedFields, field);
- }
- removedFields.add(field);
- currentSchema.removeField(field, colIndex);
-
- isFieldFound = false;
- }
- }
-
- if (!isFieldFound) {
- field = createField(
- currentSchema,
- prefixFieldName,
- fieldName,
- fieldType,
- colIndex
- );
-
- reader.recordNewField(field);
- currentSchema.addField(field);
- }
-
- field.setRead(true);
-
- VectorHolder holder = getOrCreateVectorHolder(reader, field);
- if (readType != null) {
- RecordSchema fieldSchema = field.getAssignedSchema();
- RecordSchema newSchema = readType.createSchema();
-
- if (readType != ReadType.ARRAY) {
- reader.setCurrentSchema(fieldSchema);
- if (fieldSchema == null) reader.setCurrentSchema(newSchema);
- readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
- } else {
- readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
- }
-
- reader.setCurrentSchema(currentSchema);
-
- } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) {
- return addValueToVector(
- rowIndex,
- holder,
- JacksonHelper.getValueFromFieldType(
- reader.getParser(),
- fieldType.getMinorType()
- ),
- fieldType.getMinorType(),
- groupCount
- );
- }
-
- return true;
- }
-
- private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) {
- switch (minorType) {
- case BIGINT: {
- holder.incAndCheckLength(NullableIntHolder.WIDTH * 8 + 1);
- if (groupCount == 0) {
- if (val != null) {
- NullableBigIntVector int4 = (NullableBigIntVector) holder.getValueVector();
- NullableBigIntVector.Mutator m = int4.getMutator();
- m.set(index, (Long) val);
- }
- } else {
- if (val == null) {
- throw new UnsupportedOperationException("Nullable repeated int is not supported.");
- }
-
- RepeatedBigIntVector repeatedInt4 = (RepeatedBigIntVector) holder.getValueVector();
- RepeatedBigIntVector.Mutator m = repeatedInt4.getMutator();
- holder.setGroupCount(index);
- m.add(index, (Long) val);
- }
-
- return holder.hasEnoughSpace(NullableIntHolder.WIDTH * 8 + 1);
- }
- case FLOAT4: {
- holder.incAndCheckLength(NullableFloat4Holder.WIDTH * 8 + 1);
- if (groupCount == 0) {
- if (val != null) {
- NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
- NullableFloat4Vector.Mutator m = float4.getMutator();
- m.set(index, (Float) val);
- }
- } else {
- if (val == null) {
- throw new UnsupportedOperationException("Nullable repeated float is not supported.");
- }
-
- RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector();
- RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator();
- holder.setGroupCount(index);
- m.add(index, (Float) val);
- }
- return holder.hasEnoughSpace(NullableFloat4Holder.WIDTH * 8 + 1);
- }
- case VARCHAR: {
- if (val == null) {
- return (index + 1) * 4 <= holder.getLength();
- } else {
- byte[] bytes = ((String) val).getBytes(UTF_8);
- int length = bytes.length;
- holder.incAndCheckLength(length);
- if (groupCount == 0) {
- NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
- NullableVarCharVector.Mutator m = varLen4.getMutator();
- m.set(index, bytes);
- } else {
- RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector();
- RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator();
- holder.setGroupCount(index);
- m.add(index, bytes);
- }
- return holder.hasEnoughSpace(length + 4 + 1);
- }
- }
- case BIT: {
- holder.incAndCheckLength(NullableBitHolder.WIDTH + 1);
- if (groupCount == 0) {
- if (val != null) {
- NullableBitVector bit = (NullableBitVector) holder.getValueVector();
- NullableBitVector.Mutator m = bit.getMutator();
- m.set(index, (Boolean) val ? 1 : 0);
- }
- } else {
- if (val == null) {
- throw new UnsupportedOperationException("Nullable repeated boolean is not supported.");
- }
-
- RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector();
- RepeatedBitVector.Mutator m = repeatedBit.getMutator();
- holder.setGroupCount(index);
- m.add(index, (Boolean) val ? 1 : 0);
- }
- return holder.hasEnoughSpace(NullableBitHolder.WIDTH + 1);
- }
- default:
- throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
- }
- }
-
- private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
- return reader.getOrCreateVectorHolder(field);
- }
-
- public abstract RecordSchema createSchema() throws IOException;
-
- public abstract Field createField(RecordSchema parentSchema,
- String prefixFieldName,
- String fieldName,
- MajorType fieldType,
- int index);
- }
-
- private void recordNewField(Field field) {
- diffSchema.recordNewField(field);
- }
-
- private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
- String fullFieldName = field.getFullFieldName();
- VectorHolder holder = valueVectorMap.get(fullFieldName);
-
- if (holder == null) {
- MajorType type = field.getFieldType();
- MinorType minorType = type.getMinorType();
-
- if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
- return null;
- }
-
- MaterializedField f = MaterializedField.create(SchemaPath.getCompoundPath(fullFieldName.split("\\.")), type);
-
- ValueVector v = outputMutator.addField(f, TypeHelper.getValueVectorClass(minorType, type.getMode()));
- AllocationHelper.allocate(v, batchSize, 50);
- holder = new VectorHolder(v);
- valueVectorMap.put(fullFieldName, holder);
- return holder;
- }
- return holder;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
new file mode 100644
index 0000000..bb52a20
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.fn.JsonRecordSplitter;
+import org.apache.drill.exec.vector.complex.fn.UTF8JsonRecordSplitter;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.hive12.common.collect.Lists;
+
+public class JSONRecordReader2 implements RecordReader{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader2.class);
+
+ private OutputMutator mutator;
+ private VectorContainerWriter writer;
+ private Path hadoopPath;
+ private FileSystem fileSystem;
+ private InputStream stream;
+ private JsonReader jsonReader;
+
+ public JSONRecordReader2(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
+ List<SchemaPath> columns) throws OutOfMemoryException {
+ this.hadoopPath = new Path(inputPath);
+ this.fileSystem = fileSystem;
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+ try{
+ stream = fileSystem.open(hadoopPath);
+ JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(stream);
+ this.writer = new VectorContainerWriter(output);
+ this.mutator = output;
+ jsonReader = new JsonReader(splitter);
+ }catch(IOException e){
+ throw new ExecutionSetupException("Failure reading JSON file.", e);
+ }
+ }
+
+ @Override
+ public int next() {
+ writer.allocate();
+ writer.reset();
+
+ int i =0;
+
+ try{
+ outside: while(true){
+ writer.setPosition(i);
+
+ switch(jsonReader.write(writer)){
+ case WRITE_SUCCEED:
+ i++;
+ break;
+
+ case NO_MORE:
+// System.out.println("no more records - main loop");
+ break outside;
+
+ case WRITE_FAILED:
+// System.out.println("==== hit bounds at " + i);
+ break outside;
+ };
+ }
+
+
+ writer.setValueCount(i);
+ mutator.setNewSchema();
+ return i;
+
+ }catch(IOException | SchemaChangeException e){
+ throw new DrillRuntimeException("Failure while reading JSON file.", e);
+ }
+
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ logger.warn("Failure while closing stream.", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 25931db..9e01268 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -224,7 +224,7 @@ public class HiveRecordReader implements RecordReader {
PrimitiveCategory pCat = primitiveCategories.get(i);
MajorType type = getMajorType(pCat);
MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(columnNames.get(i)), type);
- ValueVector vv = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+ ValueVector vv = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
vectors.add(vv);
}
for (int i = 0; i < selectedPartitionNames.size(); i++) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index eb9e7a6..5c07dc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -76,7 +76,7 @@ public class MockRecordReader implements RecordReader {
for (int i = 0; i < config.getTypes().length; i++) {
MajorType type = config.getTypes()[i].getMajorType();
- valueVectors[i] = output.addField(getVector(config.getTypes()[i].getName(), type, batchRecordCount), TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+ valueVectors[i] = output.addField(getVector(config.getTypes()[i].getName(), type, batchRecordCount), (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
}
output.setNewSchema();
} catch (SchemaChangeException e) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 75cd799..5d28456 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import com.google.common.base.Preconditions;
+
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.ExpressionPosition;
@@ -41,10 +42,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.*;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
-import org.apache.drill.exec.vector.VarCharVector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -263,7 +260,7 @@ public class ParquetRecordReader implements RecordReader {
//convertedTypes.put()
fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
- v = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+ v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
convertedType);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index 86aec44..f7a74c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -19,11 +19,11 @@ package org.apache.drill.exec.vector;
public class AllocationHelper {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationHelper.class);
-
+
public static void allocate(ValueVector v, int valueCount, int bytesPerValue){
allocate(v, valueCount, bytesPerValue, 5);
}
-
+
public static void allocate(ValueVector v, int valueCount, int bytesPerValue, int repeatedPerTop){
if(v instanceof FixedWidthVector){
((FixedWidthVector) v).allocateNew(valueCount);
@@ -34,7 +34,7 @@ public class AllocationHelper {
}else if(v instanceof RepeatedVariableWidthVector){
((RepeatedVariableWidthVector) v).allocateNew(valueCount * bytesPerValue * repeatedPerTop, valueCount, valueCount * repeatedPerTop);
}else{
- throw new UnsupportedOperationException();
+ v.allocateNew();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index ddddab1..9641e6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -17,21 +17,25 @@
*/
package org.apache.drill.exec.vector;
+import java.util.Iterator;
+
import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.DeadBuf;
import org.apache.drill.exec.record.MaterializedField;
+import com.google.hive12.common.collect.Iterators;
+
public abstract class BaseDataValueVector extends BaseValueVector{
protected ByteBuf data = DeadBuf.DEAD_BUFFER;
protected int valueCount;
-
+
public BaseDataValueVector(MaterializedField field, BufferAllocator allocator) {
super(field, allocator);
-
+
}
/**
@@ -46,7 +50,7 @@ public abstract class BaseDataValueVector extends BaseValueVector{
}
}
-
+
@Override
public ByteBuf[] getBuffers(){
ByteBuf[] out;
@@ -60,18 +64,24 @@ public abstract class BaseDataValueVector extends BaseValueVector{
clear();
return out;
}
-
+
public int getBufferSize() {
if(valueCount == 0) return 0;
return data.writerIndex();
}
@Override
- public abstract FieldMetadata getMetadata();
+ public abstract SerializedField getMetadata();
public ByteBuf getData(){
return data;
}
-
-
+
+ @Override
+ public Iterator<ValueVector> iterator() {
+ return Iterators.emptyIterator();
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index 7cc1adf..7a61475 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -17,14 +17,21 @@
*/
package org.apache.drill.exec.vector;
+import java.util.Iterator;
+
import org.apache.drill.common.expression.FieldReference;
+
import io.netty.buffer.ByteBuf;
+
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
+import com.google.hive12.common.collect.Iterators;
+
public abstract class BaseValueVector implements ValueVector{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class);
-
+
protected final BufferAllocator allocator;
protected final MaterializedField field;
@@ -32,21 +39,24 @@ public abstract class BaseValueVector implements ValueVector{
this.allocator = allocator;
this.field = field;
}
-
+
@Override
public void close() {
clear();
}
-
+
@Override
public MaterializedField getField() {
return field;
}
-
+
public MaterializedField getField(FieldReference ref){
return getField().clone(ref);
}
-
+
+ protected SerializedField.Builder getMetadataBuilder(){
+ return getField().getAsBuilder();
+ }
abstract public ByteBuf getData();
@@ -54,12 +64,15 @@ public abstract class BaseValueVector implements ValueVector{
public abstract int getValueCount();
public void reset(){}
}
-
+
abstract class BaseMutator implements Mutator{
public void reset(){}
}
-
-
-
+
+ @Override
+ public Iterator<ValueVector> iterator() {
+ return Iterators.emptyIterator();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 63384a3..597b0f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -24,9 +24,12 @@ import org.apache.drill.exec.expr.holders.BitHolder;
import org.apache.drill.exec.expr.holders.NullableBitHolder;
import org.apache.drill.exec.memory.AccountingByteBuf;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.impl.BitReaderImpl;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
/**
* Bit implements a vector of bit-width values. Elements in the vector are accessed by position from the logical start
@@ -49,11 +52,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
@Override
- public FieldMetadata getMetadata() {
- return FieldMetadata.newBuilder()
- .setDef(getField().getDef())
- .setValueCount(valueCount)
- .setBufferLength( (int) Math.ceil(valueCount / 8.0))
+ public SerializedField getMetadata() {
+ return field.getAsBuilder() //
+ .setValueCount(valueCount) //
+ .setBufferLength( (int) Math.ceil(valueCount / 8.0)) //
.build();
}
@@ -66,13 +68,26 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
public void allocateNew() {
+ if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException();
+ }
+
+ public boolean allocateNewSafe() {
clear();
if (allocationMonitor > 5) {
allocationValueCount = Math.min(1, (int)(allocationValueCount * 0.9));
} else if (allocationMonitor < -5) {
allocationValueCount = (int) (allocationValueCount * 1.1);
}
- allocateNew(allocationValueCount);
+
+ clear();
+ valueCapacity = allocationValueCount;
+ int valueSize = getSizeFromCount(allocationValueCount);
+ data = allocator.buffer(valueSize);
+ if(data == null) return false;
+ for (int i = 0; i < valueSize; i++) {
+ data.setByte(i, 0);
+ }
+ return true;
}
/**
@@ -112,8 +127,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
@Override
- public void load(FieldMetadata metadata, ByteBuf buffer) {
- assert this.field.getDef().equals(metadata.getDef());
+ public void load(SerializedField metadata, ByteBuf buffer) {
+ assert this.field.matches(metadata);
int loaded = load(metadata.getValueCount(), buffer);
assert metadata.getBufferLength() == loaded;
}
@@ -177,9 +192,6 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
}
- private void copyTo(int startIndex, int length, BitVector target) {
-
- }
private class TransferImpl implements TransferPair {
BitVector to;
@@ -205,8 +217,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
@Override
- public void copyValue(int fromIndex, int toIndex) {
- to.copyFrom(fromIndex, toIndex, BitVector.this);
+ public boolean copyValueSafe(int fromIndex, int toIndex) {
+ return to.copyFromSafe(fromIndex, toIndex, BitVector.this);
}
}
@@ -233,7 +245,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
@Override
- public final Object getObject(int index) {
+ public final Boolean getObject(int index) {
return new Boolean(get(index) != 0);
}
@@ -245,9 +257,15 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
holder.value = get(index);
}
- final void get(int index, NullableBitHolder holder) {
+ public final void get(int index, NullableBitHolder holder) {
+ holder.isSet = 1;
holder.value = get(index);
}
+
+ @Override
+ public FieldReader getReader() {
+ return new BitReaderImpl(BitVector.this);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
index 8e097e4..ad2ba1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
@@ -18,5 +18,5 @@
package org.apache.drill.exec.vector;
public interface RepeatedMutator extends ValueVector.Mutator {
- public void startNewGroup(int index);
+ public boolean startNewGroup(int index);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 258b354..8b871fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -22,24 +22,34 @@ import io.netty.buffer.ByteBuf;
import java.io.Closeable;
import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
/**
* ValueVectorTypes defines a set of template-generated classes which implement type-specific value vectors. The
* template approach was chosen due to the lack of multiple inheritence. It is also important that all related logic be
* as efficient as possible.
*/
-public interface ValueVector extends Closeable {
+public interface ValueVector extends Closeable, Iterable<ValueVector> {
+
/**
* Allocate new buffers. ValueVector implements logic to determine how much to allocate.
+ * @throws OutOfMemoryRuntimeException Thrown if no memory can be allocated.
*/
- public void allocateNew();
+ public void allocateNew() throws OutOfMemoryRuntimeException;
+
+ /**
+ * Allocates new buffers. ValueVector implements logic to determine how much to allocate.
+ * @return Returns true if allocation was succesful.
+ */
+ public boolean allocateNewSafe();
public int getBufferSize();
-
+
/**
* Alternative to clear(). Allows use as closeable in try-with-resources.
*/
@@ -52,27 +62,26 @@ public interface ValueVector extends Closeable {
/**
* Get information about how this field is materialized.
- *
+ *
* @return
*/
public MaterializedField getField();
/**
- * Get a transfer pair to allow transferring this vectors data between this vector and a destination vector of the same
- * type. Will also generate a second instance of this vector class that is connected through the TransferPair.
- *
- * @return
+ * Get a transfer pair to allow transferring this vectors data between this vector and a destination vector of the
+ * same type. Will also generate a second instance of this vector class that is connected through the TransferPair.
+ *
+ * @return
*/
public TransferPair getTransferPair();
public TransferPair makeTransferPair(ValueVector to);
-
-
+
public TransferPair getTransferPair(FieldReference ref);
/**
* Given the current buffer allocation, return the maximum number of values that this buffer can contain.
- *
+ *
* @return Maximum values buffer can contain. In the case of a Repeated field, this is the number of atoms, not
* repeated groups.
*/
@@ -80,37 +89,40 @@ public interface ValueVector extends Closeable {
/**
* Get Accessor to read value vector data.
- *
+ *
* @return
*/
public abstract Accessor getAccessor();
/**
- * Return the underlying buffers associated with this vector. Note that this doesn't impact the
- * reference counts for this buffer so it only should be used for in-context access. Also note
- * that this buffer changes regularly thus external classes shouldn't hold a reference to
- * it (unless they change it).
+ * Return the underlying buffers associated with this vector. Note that this doesn't impact the reference counts for
+ * this buffer so it only should be used for in-context access. Also note that this buffer changes regularly thus
+ * external classes shouldn't hold a reference to it (unless they change it).
*
* @return The underlying ByteBuf.
*/
public abstract ByteBuf[] getBuffers();
-
+
/**
- * Load the data provided in the buffer. Typically used when deserializing from the wire.
- * @param metadata Metadata used to decode the incoming buffer.
- * @param buffer The buffer that contains the ValueVector.
+ * Load the data provided in the buffer. Typically used when deserializing from the wire.
+ *
+ * @param metadata
+ * Metadata used to decode the incoming buffer.
+ * @param buffer
+ * The buffer that contains the ValueVector.
*/
- public void load(FieldMetadata metadata, ByteBuf buffer);
-
+ public void load(SerializedField metadata, ByteBuf buffer);
+
/**
- * Get the metadata for this field. Used in serialization
+ * Get the metadata for this field. Used in serialization
+ *
* @return FieldMetadata for this field.
*/
- public FieldMetadata getMetadata();
-
+ public SerializedField getMetadata();
+
/**
* Get a Mutator to update this vectors data.
- *
+ *
* @return
*/
public abstract Mutator getMutator();
@@ -125,23 +137,25 @@ public interface ValueVector extends Closeable {
/**
* Get the Java Object representation of the element at the specified position. Useful for testing.
- *
+ *
* @param index
* Index of the value to get
*/
public abstract Object getObject(int index);
public int getValueCount();
-
+
public boolean isNull(int index);
public void reset();
+
+ public FieldReader getReader();
}
public interface Mutator {
/**
* Set the top number values (optional/required) or number of value groupings (repeated) in this vector.
- *
+ *
* @param valueCount
*/
public void setValueCount(int valueCount);
@@ -150,4 +164,5 @@ public interface ValueVector extends Closeable {
public void generateTestData(int values);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java
index 32f08b0..adee171 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java
@@ -40,6 +40,6 @@ public class GenericAccessor extends AbstractSqlAccessor {
@Override
TypeProtos.MajorType getType() {
- return v.getMetadata().getDef().getMajorType();
+ return v.getMetadata().getMajorType();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
new file mode 100644
index 0000000..ab1d270
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.vector.ValueVector;
+
+public abstract class AbstractContainerVector implements ValueVector{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class);
+
+ public abstract <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz);
+ public abstract <T extends ValueVector> T get(String name, Class<T> clazz);
+ public abstract int size();
+
+ protected <T extends ValueVector> T typeify(ValueVector v, Class<T> clazz){
+ if(clazz.isAssignableFrom(v.getClass())){
+ return (T) v;
+ }else{
+ throw new IllegalStateException(String.format("Vector requested [%s] was different than type stored [%s]. Drill doesn't yet support hetergenous types.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+ }
+ }
+
+ public abstract VectorWithOrdinal getVectorWithOrdinal(String name);
+
+
+ public TypedFieldId getFieldIdIfMatches(TypedFieldId.Builder builder, boolean addToBreadCrumb, PathSegment seg){
+ if(seg == null){
+ if(addToBreadCrumb) builder.intermediateType(this.getField().getType());
+ return builder.finalType(this.getField().getType()).build();
+ }
+
+ if(seg.isArray()){
+
+ if(seg.isLastPath()){
+ if(addToBreadCrumb) builder.intermediateType(this.getField().getType());
+ return builder //
+ .remainder(seg) //
+ .finalType(this.getField().getType()) //
+ .withIndex() //
+ .build();
+ }else{
+ if(addToBreadCrumb){
+ addToBreadCrumb = false;
+ builder.remainder(seg);
+ }
+ // this is a complex array reference, which means it doesn't correspond directly to a vector by itself.
+ seg = seg.getChild();
+
+ }
+
+ }else{
+ // name segment.
+ }
+
+ VectorWithOrdinal vord = getVectorWithOrdinal(seg.isArray() ? null : seg.getNameSegment().getPath());
+ if(vord == null) return null;
+
+
+ if(addToBreadCrumb){
+ builder.intermediateType(this.getField().getType());
+ builder.addId(vord.ordinal);
+ }
+
+ ValueVector v = vord.vector;
+
+ if(v instanceof AbstractContainerVector){
+ // we're looking for a multi path.
+ AbstractContainerVector c = (AbstractContainerVector) v;
+ return c.getFieldIdIfMatches(builder, addToBreadCrumb, seg.getChild());
+ }else{
+ if(seg.isLastPath()){
+ if(addToBreadCrumb) builder.intermediateType(v.getField().getType());
+ return builder.finalType(v.getField().getType()).build();
+ }else{
+ logger.warn("You tried to request a complex type inside a scalar object.");
+ return null;
+ }
+ }
+
+ }
+
+ protected boolean supportsDirectRead(){
+ return false;
+ }
+
+ protected class VectorWithOrdinal{
+ final ValueVector vector;
+ final int ordinal;
+
+ public VectorWithOrdinal(ValueVector v, int ordinal){
+ this.vector = v;
+ this.ordinal = ordinal;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
new file mode 100644
index 0000000..f126e5c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+public class AbstractMapVector {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractMapVector.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
new file mode 100644
index 0000000..91c0be5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.ComplexHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector.MapSingleCopier;
+import org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.hive12.common.collect.Lists;
+
+public class MapVector extends AbstractContainerVector {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class);
+
+ public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REQUIRED).build();
+
+ final HashMap<String, ValueVector> vectors = Maps.newHashMap();
+ private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap();
+ private final IntObjectOpenHashMap<ValueVector> vectorsById = new IntObjectOpenHashMap<>();
+ private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this);
+ private final Accessor accessor = new Accessor();
+ private final Mutator mutator = new Mutator();
+ private final BufferAllocator allocator;
+ private MaterializedField field;
+ private int valueCount;
+
+ public MapVector(String path, BufferAllocator allocator){
+ this.field = MaterializedField.create(SchemaPath.getSimplePath(path), TYPE);
+ this.allocator = allocator;
+ }
+ public MapVector(MaterializedField field, BufferAllocator allocator){
+ this.field = field;
+ this.allocator = allocator;
+ }
+
+ public int size(){
+ return vectors.size();
+ }
+
+ transient private MapTransferPair ephPair;
+ transient private MapSingleCopier ephPair2;
+
+ public boolean copyFromSafe(int fromIndex, int thisIndex, MapVector from){
+ if(ephPair == null || ephPair.from != from){
+ ephPair = (MapTransferPair) from.makeTransferPair(this);
+ }
+ return ephPair.copyValueSafe(fromIndex, thisIndex);
+ }
+
+ public boolean copyFromSafe(int fromSubIndex, int thisIndex, RepeatedMapVector from){
+ if(ephPair2 == null || ephPair2.from != from){
+ ephPair2 = from.makeSingularCopier(this);
+ }
+ return ephPair2.copySafe(fromSubIndex, thisIndex);
+ }
+
+ @Override
+ public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
+ ValueVector v = vectors.get(name);
+
+ if(v == null){
+ v = TypeHelper.getNewVector(field.getPath(), name, allocator, type);
+ Preconditions.checkNotNull(v, String.format("Failure to create vector of type %s.", type));
+ put(name, v);
+ }
+ return typeify(v, clazz);
+
+ }
+
+ protected void put(String name, ValueVector vv){
+ int ordinal = vectors.size();
+ if(vectors.put(name, vv) != null){
+ throw new IllegalStateException();
+ }
+ vectorIds.put(name, new VectorWithOrdinal(vv, ordinal));
+ vectorsById.put(ordinal, vv);
+ field.addChild(vv.getField());
+ }
+
+
+ @Override
+ protected boolean supportsDirectRead() {
+ return true;
+ }
+
+ public Iterator<String> fieldNameIterator(){
+ return vectors.keySet().iterator();
+ }
+
+ @Override
+ public void allocateNew() throws OutOfMemoryRuntimeException {
+ if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException();
+ }
+
+ @Override
+ public boolean allocateNewSafe() {
+ for(ValueVector v : vectors.values()){
+ if(!v.allocateNewSafe()) return false;
+ }
+ return true;
+ }
+
+ @Override
+ public <T extends ValueVector> T get(String name, Class<T> clazz) {
+ ValueVector v = vectors.get(name);
+ if(v == null) throw new IllegalStateException(String.format("Attempting to access invalid map field of name %s.", name));
+ return typeify(v, clazz);
+ }
+
+ @Override
+ public int getBufferSize() {
+ if(valueCount == 0 || vectors.isEmpty()) return 0;
+ long buffer = 0;
+ for(ValueVector v : this){
+ buffer += v.getBufferSize();
+ }
+
+ return (int) buffer;
+ }
+
+ @Override
+ public void close() {
+ for(ValueVector v : this){
+ v.close();
+ }
+ }
+
+ @Override
+ public Iterator<ValueVector> iterator() {
+ return vectors.values().iterator();
+ }
+
+ @Override
+ public MaterializedField getField() {
+ return field;
+ }
+
+ @Override
+ public TransferPair getTransferPair() {
+ return new MapTransferPair(field.getPath());
+ }
+
+ @Override
+ public TransferPair makeTransferPair(ValueVector to) {
+ return new MapTransferPair( (MapVector) to);
+ }
+
+ @Override
+ public TransferPair getTransferPair(FieldReference ref) {
+ return new MapTransferPair(ref);
+ }
+
+ private class MapTransferPair implements TransferPair{
+ private MapVector from = MapVector.this;
+ private TransferPair[] pairs;
+ private MapVector to;
+
+ public MapTransferPair(SchemaPath path){
+ MapVector v = new MapVector(MaterializedField.create(path, TYPE), allocator);
+ pairs = new TransferPair[vectors.size()];
+ int i =0;
+ for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+ TransferPair otherSide = e.getValue().getTransferPair();
+ v.put(e.getKey(), otherSide.getTo());
+ pairs[i++] = otherSide;
+ }
+ this.to = v;
+ }
+
+ public MapTransferPair(MapVector to){
+ this.to = to;
+ pairs = new TransferPair[vectors.size()];
+ int i =0;
+ for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+ int preSize = to.vectors.size();
+ ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass());
+ if(to.vectors.size() != preSize) v.allocateNew();
+ pairs[i++] = e.getValue().makeTransferPair(v);
+ }
+ }
+
+
+ @Override
+ public void transfer() {
+ for(TransferPair p : pairs){
+ p.transfer();
+ }
+ to.valueCount = valueCount;
+ clear();
+ }
+
+ @Override
+ public ValueVector getTo() {
+ return to;
+ }
+
+ @Override
+ public boolean copyValueSafe(int from, int to) {
+ for(TransferPair p : pairs){
+ if(!p.copyValueSafe(from, to)) return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void splitAndTransfer(int startIndex, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ @Override
+ public int getValueCapacity() {
+ if(this.vectors.isEmpty()) return 0;
+ return vectors.values().iterator().next().getValueCapacity();
+ }
+
+ @Override
+ public Accessor getAccessor() {
+ return accessor;
+ }
+
+ @Override
+ public ByteBuf[] getBuffers() {
+ List<ByteBuf> bufs = Lists.newArrayList();
+ for(ValueVector v : vectors.values()){
+ for(ByteBuf b : v.getBuffers()){
+ bufs.add(b);
+ }
+ }
+ return bufs.toArray(new ByteBuf[bufs.size()]);
+ }
+
+ @Override
+ public void load(SerializedField metadata, ByteBuf buf) {
+ List<SerializedField> fields = metadata.getChildList();
+
+ int bufOffset = 0;
+ for (SerializedField fmd : fields) {
+ MaterializedField fieldDef = MaterializedField.create(fmd);
+
+ ValueVector v = vectors.get(fieldDef.getLastName());
+ if(v == null) {
+ // if we arrive here, we didn't have a matching vector.
+
+ v = TypeHelper.getNewVector(fieldDef, allocator);
+ }
+ if (fmd.getValueCount() == 0){
+ v.clear();
+ } else {
+ v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+ }
+ bufOffset += fmd.getBufferLength();
+ put(fieldDef.getLastName(), v);
+ }
+ }
+
+ @Override
+ public SerializedField getMetadata() {
+ SerializedField.Builder b = getField() //
+ .getAsBuilder() //
+ .setBufferLength(getBufferSize()) //
+ .setValueCount(valueCount);
+
+
+ for(ValueVector v : vectors.values()){
+ b.addChild(v.getMetadata());
+ }
+ return b.build();
+ }
+
+ @Override
+ public Mutator getMutator() {
+ return mutator;
+ }
+
+ public class Accessor implements ValueVector.Accessor{
+
+ @Override
+ public Object getObject(int index) {
+ Map<String, Object> vv = Maps.newHashMap();
+ for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+ ValueVector v = e.getValue();
+ String k = e.getKey();
+ Object value = v.getAccessor().getObject(index);
+ if(value != null){
+ vv.put(k, value);
+ }
+ }
+ return vv;
+ }
+
+ public void get(int index, ComplexHolder holder){
+ reader.setPosition(index);
+ holder.reader = reader;
+ }
+
+ @Override
+ public int getValueCount() {
+ return valueCount;
+ }
+
+ @Override
+ public boolean isNull(int index) {
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public FieldReader getReader() {
+ return new SingleMapReaderImpl(MapVector.this);
+ }
+
+ }
+
+ public ValueVector getVectorById(int id){
+ return vectorsById.get(id);
+ }
+
+ public class Mutator implements ValueVector.Mutator{
+
+ @Override
+ public void setValueCount(int valueCount) {
+ for(ValueVector v : vectors.values()){
+ v.getMutator().setValueCount(valueCount);
+ }
+ MapVector.this.valueCount = valueCount;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void generateTestData(int values) {
+ }
+
+ }
+
+ @Override
+ public void clear() {
+ for(ValueVector v : vectors.values()){
+ v.clear();;
+ }
+ }
+
+ public VectorWithOrdinal getVectorWithOrdinal(String name){
+ return vectorIds.get(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java
new file mode 100644
index 0000000..6d86a64
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+public interface Positionable {
+ public void setPosition(int index);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
new file mode 100644
index 0000000..93930b5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.ComplexHolder;
+import org.apache.drill.exec.expr.holders.RepeatedListHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.impl.NullReader;
+import org.apache.drill.exec.vector.complex.impl.RepeatedListReaderImpl;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+import com.google.common.collect.Lists;
+import com.google.hive12.common.base.Preconditions;
+
+
+public class RepeatedListVector extends AbstractContainerVector implements RepeatedFixedWidthVector{
+
+ private final UInt4Vector offsets; // offsets to start of each record
+ private final BufferAllocator allocator;
+ private final Mutator mutator = new Mutator();
+ private final Accessor accessor = new Accessor();
+ private ValueVector vector;
+ private final MaterializedField field;
+ private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
+ private int allocationValueCount = 4000;
+ private int allocationMonitor = 0;
+
+ private int lastSet = 0;
+
+ private int valueCount;
+
+ public static MajorType TYPE = Types.repeated(MinorType.LIST);
+
+ public RepeatedListVector(MaterializedField field, BufferAllocator allocator){
+ this.allocator = allocator;
+ this.offsets = new UInt4Vector(null, allocator);
+ this.field = field;
+ }
+
+ public int size(){
+ return vector != null ? 1 : 0;
+ }
+
+ public RepeatedListVector(SchemaPath path, BufferAllocator allocator){
+ this(MaterializedField.create(path, TYPE), allocator);
+ }
+
+ transient private RepeatedListTransferPair ephPair;
+
+ public boolean copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from){
+ if(ephPair == null || ephPair.from != from){
+ ephPair = (RepeatedListTransferPair) from.makeTransferPair(this);
+ }
+ return ephPair.copyValueSafe(fromIndex, thisIndex);
+ }
+
+ public Mutator getMutator(){
+ return mutator;
+ }
+
+ @Override
+ public void allocateNew() throws OutOfMemoryRuntimeException {
+ if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException();
+ }
+
+ @Override
+ public boolean allocateNewSafe() {
+ if(!offsets.allocateNewSafe()) return false;
+
+ if(vector != null){
+ return vector.allocateNewSafe();
+ }else{
+ return true;
+ }
+
+ }
+
+ public class Mutator implements ValueVector.Mutator{
+
+ public void startNewGroup(int index) {
+ offsets.getMutator().set(index+1, offsets.getAccessor().get(index));
+ }
+
+ public int add(int index){
+ int endOffset = index+1;
+ int currentChildOffset = offsets.getAccessor().get(endOffset);
+ int newChildOffset = currentChildOffset + 1;
+ boolean success = offsets.getMutator().setSafe(endOffset, newChildOffset);
+ lastSet = index;
+ if(!success) return -1;
+
+ // this is done at beginning so return the currentChildOffset, not the new offset.
+ return currentChildOffset;
+
+ }
+
+ @Override
+ public void setValueCount(int groupCount) {
+ populateEmpties(groupCount);
+ offsets.getMutator().setValueCount(groupCount+1);
+
+ if(vector != null){
+ int valueCount = offsets.getAccessor().get(groupCount);
+ vector.getMutator().setValueCount(valueCount);
+ }
+ }
+
+ @Override
+ public void reset() {
+ lastSet = 0;
+ }
+
+ @Override
+ public void generateTestData(int values) {
+ }
+
+ }
+
+ public class Accessor implements ValueVector.Accessor {
+
+ @Override
+ public Object getObject(int index) {
+ List<Object> l = Lists.newArrayList();
+ int end = offsets.getAccessor().get(index+1);
+ for(int i = offsets.getAccessor().get(index); i < end; i++){
+ l.add(vector.getAccessor().getObject(i));
+ }
+ return l;
+ }
+
+ @Override
+ public int getValueCount() {
+ return offsets.getAccessor().getValueCount() - 1;
+ }
+
+ public void get(int index, RepeatedListHolder holder){
+ assert index <= getValueCapacity();
+ holder.start = offsets.getAccessor().get(index);
+ holder.end = offsets.getAccessor().get(index+1);
+ }
+
+ public void get(int index, ComplexHolder holder){
+ FieldReader reader = getReader();
+ reader.setPosition(index);
+ holder.reader = reader;
+ }
+
+ public void get(int index, int arrayIndex, ComplexHolder holder){
+ RepeatedListHolder h = new RepeatedListHolder();
+ get(index, h);
+ int offset = h.start + arrayIndex;
+
+ if(offset >= h.end){
+ holder.reader = NullReader.INSTANCE;
+ }else{
+ FieldReader r = vector.getAccessor().getReader();
+ r.setPosition(offset);
+ holder.reader = r;
+ }
+
+ }
+
+ @Override
+ public boolean isNull(int index) {
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public FieldReader getReader() {
+ return reader;
+ }
+
+ }
+
+ @Override
+ public int getBufferSize() {
+ return offsets.getBufferSize() + vector.getBufferSize();
+ }
+
+ @Override
+ public void close() {
+ offsets.close();
+ if(vector != null) vector.close();
+ }
+
+ @Override
+ public void clear() {
+ lastSet = 0;
+ offsets.clear();
+ if(vector != null) vector.clear();
+ }
+
+ @Override
+ public MaterializedField getField() {
+ return field;
+ }
+
+ @Override
+ public TransferPair getTransferPair() {
+ return new RepeatedListTransferPair(field.getPath());
+ }
+
+
+ public class RepeatedListTransferPair implements TransferPair{
+ private final RepeatedListVector from = RepeatedListVector.this;
+ private final RepeatedListVector to;
+ private final TransferPair vectorTransfer;
+
+ private RepeatedListTransferPair(RepeatedListVector to){
+ this.to = to;
+ if(to.vector == null){
+ to.vector = to.addOrGet(null, vector.getField().getType(), vector.getClass());
+ to.vector.allocateNew();
+ }
+ this.vectorTransfer = vector.makeTransferPair(to.vector);
+ }
+
+ private RepeatedListTransferPair(SchemaPath path){
+ this.to = new RepeatedListVector(path, allocator);
+ vectorTransfer = vector.getTransferPair();
+ this.to.vector = vectorTransfer.getTo();
+ }
+
+ @Override
+ public void transfer() {
+ offsets.transferTo(to.offsets);
+ vectorTransfer.transfer();
+ to.valueCount = valueCount;
+ clear();
+ }
+
+ @Override
+ public ValueVector getTo() {
+ return to;
+ }
+
+ @Override
+ public void splitAndTransfer(int startIndex, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ public boolean copyValueSafe(int from, int to) {
+ RepeatedListHolder holder = new RepeatedListHolder();
+ accessor.get(from, holder);
+ int newIndex = this.to.offsets.getAccessor().get(to);
+ //todo: make this a bulk copy.
+ for(int i = holder.start; i < holder.end; i++, newIndex++){
+ if(!vectorTransfer.copyValueSafe(i, newIndex)) return false;
+ }
+ if(!this.to.offsets.getMutator().setSafe(to, newIndex)) return false;
+
+ return true;
+ }
+
+ }
+
+ @Override
+ public TransferPair makeTransferPair(ValueVector to) {
+ if(!(to instanceof RepeatedListVector ) ) throw new IllegalArgumentException("You can't make a transfer pair from an incompatible .");
+ return new RepeatedListTransferPair( (RepeatedListVector) to);
+ }
+
+ @Override
+ public TransferPair getTransferPair(FieldReference ref) {
+ return new RepeatedListTransferPair(ref);
+ }
+
+ @Override
+ public int getValueCapacity() {
+ if(vector == null) return offsets.getValueCapacity() - 1;
+ return Math.min(offsets.getValueCapacity() - 1, vector.getValueCapacity());
+ }
+
+ @Override
+ public Accessor getAccessor() {
+ return accessor;
+ }
+
+ @Override
+ public ByteBuf[] getBuffers() {
+ return ArrayUtils.addAll(offsets.getBuffers(), vector.getBuffers());
+ }
+
+ private void setVector(ValueVector v){
+ field.addChild(v.getField());
+ this.vector = v;
+ }
+
+ @Override
+ public void load(SerializedField metadata, ByteBuf buf) {
+ SerializedField childField = metadata.getChildList().get(0);
+
+ int bufOffset = offsets.load(metadata.getValueCount()+1, buf);
+
+ MaterializedField fieldDef = MaterializedField.create(childField);
+ if(vector == null) {
+ setVector(TypeHelper.getNewVector(fieldDef, allocator));
+ }
+
+ if (childField.getValueCount() == 0){
+ vector.clear();
+ } else {
+ vector.load(childField, buf.slice(bufOffset, childField.getBufferLength()));
+ }
+ }
+
+ @Override
+ public SerializedField getMetadata() {
+ return getField() //
+ .getAsBuilder() //
+ .setBufferLength(getBufferSize()) //
+ .setValueCount(accessor.getValueCount()) //
+ .addChild(vector.getMetadata()) //
+ .build();
+ }
+
+ private void populateEmpties(int groupCount){
+ int previousEnd = offsets.getAccessor().get(lastSet + 1);
+ for(int i = lastSet + 2; i <= groupCount; i++){
+ offsets.getMutator().setSafe(i, previousEnd);
+ }
+ lastSet = groupCount - 1;
+ }
+
+ @Override
+ public Iterator<ValueVector> iterator() {
+ return Collections.singleton(vector).iterator();
+ }
+
+ @Override
+ public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
+ Preconditions.checkArgument(name == null);
+
+ if(vector == null){
+ vector = TypeHelper.getNewVector(MaterializedField.create(field.getPath().getUnindexedArrayChild(), type), allocator);
+ }
+ return typeify(vector, clazz);
+ }
+
+ @Override
+ public <T extends ValueVector> T get(String name, Class<T> clazz) {
+ if(name != null) return null;
+ return typeify(vector, clazz);
+ }
+
+ @Override
+ public void allocateNew(int parentValueCount, int childValueCount) {
+ clear();
+ offsets.allocateNew(parentValueCount+1);
+ mutator.reset();
+ accessor.reset();
+ }
+
+ @Override
+ public int load(int parentValueCount, int childValueCount, ByteBuf buf) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public VectorWithOrdinal getVectorWithOrdinal(String name) {
+ if(name != null) return null;
+ return new VectorWithOrdinal(vector, 0);
+ }
+
+
+}