You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2017/11/22 22:52:09 UTC
[07/12] drill git commit: DRILL-5941: Skip header / footer
improvements for Hive storage plugin
http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
new file mode 100644
index 0000000..9df721b
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.hive.HiveFieldConverter;
+import org.apache.drill.exec.store.hive.HivePartition;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
+import org.apache.drill.exec.store.hive.HiveUtilities;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.security.UserGroupInformation;
+
+
+public abstract class HiveAbstractReader extends AbstractRecordReader {
+ protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveAbstractReader.class);
+
+ protected final DrillBuf managedBuffer;
+
+ protected HiveTableWithColumnCache table;
+ protected HivePartition partition;
+ protected Iterator<InputSplit> inputSplitsIterator;
+ protected List<String> selectedColumnNames;
+ protected List<StructField> selectedStructFieldRefs = Lists.newArrayList();
+ protected List<TypeInfo> selectedColumnTypes = Lists.newArrayList();
+ protected List<ObjectInspector> selectedColumnObjInspectors = Lists.newArrayList();
+ protected List<HiveFieldConverter> selectedColumnFieldConverters = Lists.newArrayList();
+ protected List<String> selectedPartitionNames = Lists.newArrayList();
+ protected List<TypeInfo> selectedPartitionTypes = Lists.newArrayList();
+ protected List<Object> selectedPartitionValues = Lists.newArrayList();
+
+ // SerDe of the reading partition (or table if the table is non-partitioned)
+ protected SerDe partitionSerDe;
+
+ // ObjectInspector to read data from partitionSerDe (for a non-partitioned table this is same as the table
+ // ObjectInspector).
+ protected StructObjectInspector partitionOI;
+
+ // Final ObjectInspector. We may not use the partitionOI directly if there are schema changes between the table and
+ // partition. If there are no schema changes then this is same as the partitionOI.
+ protected StructObjectInspector finalOI;
+
+ // Converter which converts data from partition schema to table schema.
+ protected Converter partTblObjectInspectorConverter;
+
+ protected Object key;
+ protected RecordReader<Object, Object> reader;
+ protected List<ValueVector> vectors = Lists.newArrayList();
+ protected List<ValueVector> pVectors = Lists.newArrayList();
+ protected boolean empty;
+ protected HiveConf hiveConf;
+ protected FragmentContext fragmentContext;
+ protected String defaultPartitionValue;
+ protected final UserGroupInformation proxyUgi;
+ protected JobConf job;
+
+
+ public static final int TARGET_RECORD_COUNT = 4000;
+
+ public HiveAbstractReader(HiveTableWithColumnCache table, HivePartition partition, Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns,
+ FragmentContext context, final HiveConf hiveConf,
+ UserGroupInformation proxyUgi) throws ExecutionSetupException {
+ this.table = table;
+ this.partition = partition;
+ this.empty = (inputSplits == null || inputSplits.isEmpty());
+ this.inputSplitsIterator = empty ? Collections.<InputSplit>emptyIterator() : inputSplits.iterator();
+ this.hiveConf = hiveConf;
+ this.fragmentContext = context;
+ this.proxyUgi = proxyUgi;
+ this.managedBuffer = fragmentContext.getManagedBuffer().reallocIfNeeded(256);
+ setColumns(projectedColumns);
+ }
+
+ public abstract void internalInit(Properties tableProperties, RecordReader<Object, Object> reader);
+
+ private void init() throws ExecutionSetupException {
+ job = new JobConf(hiveConf);
+
+ // Get the configured default val
+ defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
+
+ Properties tableProperties;
+ try {
+ tableProperties = HiveUtilities.getTableMetadata(table);
+ final Properties partitionProperties =
+ (partition == null) ? tableProperties :
+ HiveUtilities.getPartitionMetadata(partition, table);
+ HiveUtilities.addConfToJob(job, partitionProperties);
+
+ final SerDe tableSerDe = createSerDe(job, table.getSd().getSerdeInfo().getSerializationLib(), tableProperties);
+ final StructObjectInspector tableOI = getStructOI(tableSerDe);
+
+ if (partition != null) {
+ partitionSerDe = createSerDe(job, partition.getSd().getSerdeInfo().getSerializationLib(), partitionProperties);
+ partitionOI = getStructOI(partitionSerDe);
+
+ finalOI = (StructObjectInspector)ObjectInspectorConverters.getConvertedOI(partitionOI, tableOI);
+ partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(partitionOI, finalOI);
+ job.setInputFormat(HiveUtilities.getInputFormatClass(job, partition.getSd(), table));
+ } else {
+ // For non-partitioned tables, there is no need to create converter as there are no schema changes expected.
+ partitionSerDe = tableSerDe;
+ partitionOI = tableOI;
+ partTblObjectInspectorConverter = null;
+ finalOI = tableOI;
+ job.setInputFormat(HiveUtilities.getInputFormatClass(job, table.getSd(), table));
+ }
+
+ if (logger.isTraceEnabled()) {
+ for (StructField field: finalOI.getAllStructFieldRefs()) {
+ logger.trace("field in finalOI: {}", field.getClass().getName());
+ }
+ logger.trace("partitionSerDe class is {} {}", partitionSerDe.getClass().getName());
+ }
+ // Get list of partition column names
+ final List<String> partitionNames = Lists.newArrayList();
+ for (FieldSchema field : table.getPartitionKeys()) {
+ partitionNames.add(field.getName());
+ }
+
+ // We should always get the columns names from ObjectInspector. For some of the tables (ex. avro) metastore
+ // may not contain the schema, instead it is derived from other sources such as table properties or external file.
+ // SerDe object knows how to get the schema with all the config and table properties passed in initialization.
+ // ObjectInspector created from the SerDe object has the schema.
+ final StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(finalOI);
+ final List<String> tableColumnNames = sTypeInfo.getAllStructFieldNames();
+
+ // Select list of columns for project pushdown into Hive SerDe readers.
+ final List<Integer> columnIds = Lists.newArrayList();
+ if (isStarQuery()) {
+ selectedColumnNames = tableColumnNames;
+ for(int i=0; i<selectedColumnNames.size(); i++) {
+ columnIds.add(i);
+ }
+ selectedPartitionNames = partitionNames;
+ } else {
+ selectedColumnNames = Lists.newArrayList();
+ for (SchemaPath field : getColumns()) {
+ String columnName = field.getRootSegment().getPath();
+ if (partitionNames.contains(columnName)) {
+ selectedPartitionNames.add(columnName);
+ } else {
+ columnIds.add(tableColumnNames.indexOf(columnName));
+ selectedColumnNames.add(columnName);
+ }
+ }
+ }
+ ColumnProjectionUtils.appendReadColumns(job, columnIds, selectedColumnNames);
+
+ for (String columnName : selectedColumnNames) {
+ StructField fieldRef = finalOI.getStructFieldRef(columnName);
+ selectedStructFieldRefs.add(fieldRef);
+ ObjectInspector fieldOI = fieldRef.getFieldObjectInspector();
+
+ TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldOI.getTypeName());
+
+ selectedColumnObjInspectors.add(fieldOI);
+ selectedColumnTypes.add(typeInfo);
+ selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo, fragmentContext));
+ }
+
+ for(int i=0; i<selectedColumnNames.size(); ++i){
+ logger.trace("inspector:typeName={}, className={}, TypeInfo: {}, converter:{}",
+ selectedColumnObjInspectors.get(i).getTypeName(),
+ selectedColumnObjInspectors.get(i).getClass().getName(),
+ selectedColumnTypes.get(i).toString(),
+ selectedColumnFieldConverters.get(i).getClass().getName());
+ }
+
+ for (int i = 0; i < table.getPartitionKeys().size(); i++) {
+ FieldSchema field = table.getPartitionKeys().get(i);
+ if (selectedPartitionNames.contains(field.getName())) {
+ TypeInfo pType = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
+ selectedPartitionTypes.add(pType);
+
+ if (partition != null) {
+ selectedPartitionValues.add(
+ HiveUtilities.convertPartitionType(pType, partition.getValues().get(i), defaultPartitionValue));
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new ExecutionSetupException("Failure while initializing Hive Reader " + this.getClass().getName(), e);
+ }
+
+ if (!empty && initNextReader(job)) {
+ internalInit(tableProperties, reader);
+ }
+ }
+
+ /**
+ * Initializes next reader if available, will close previous reader if any.
+ *
+ * @param job map / reduce job configuration.
+ * @return true if new reader was initialized, false is no more readers are available
+ * @throws ExecutionSetupException if could not init record reader
+ */
+ protected boolean initNextReader(JobConf job) throws ExecutionSetupException {
+ if (inputSplitsIterator.hasNext()) {
+ if (reader != null) {
+ closeReader();
+ }
+ InputSplit inputSplit = inputSplitsIterator.next();
+ try {
+ reader = (org.apache.hadoop.mapred.RecordReader<Object, Object>) job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL);
+ logger.trace("hive reader created: {} for inputSplit {}", reader.getClass().getName(), inputSplit.toString());
+ } catch (Exception e) {
+ throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Utility method which creates a SerDe object for given SerDe class name and properties.
+ */
+ private static SerDe createSerDe(final JobConf job, final String sLib, final Properties properties) throws Exception {
+ final Class<? extends SerDe> c = Class.forName(sLib).asSubclass(SerDe.class);
+ final SerDe serde = c.getConstructor().newInstance();
+ serde.initialize(job, properties);
+
+ return serde;
+ }
+
+ private static StructObjectInspector getStructOI(final SerDe serDe) throws Exception {
+ ObjectInspector oi = serDe.getObjectInspector();
+ if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
+ throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
+ }
+ return (StructObjectInspector) oi;
+ }
+
+ @Override
+ public void setup(OperatorContext context, OutputMutator output)
+ throws ExecutionSetupException {
+ // initializes "reader"
+ final Callable<Void> readerInitializer = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ init();
+ return null;
+ }
+ };
+
+ final ListenableFuture<Void> result = context.runCallableAs(proxyUgi, readerInitializer);
+ try {
+ result.get();
+ } catch (InterruptedException e) {
+ result.cancel(true);
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ throw ExecutionSetupException.fromThrowable(e.getMessage(), e);
+ }
+ try {
+ final OptionManager options = fragmentContext.getOptions();
+ for (int i = 0; i < selectedColumnNames.size(); i++) {
+ MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedColumnTypes.get(i), options);
+ MaterializedField field = MaterializedField.create(selectedColumnNames.get(i), type);
+ Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
+ vectors.add(output.addField(field, vvClass));
+ }
+
+ for (int i = 0; i < selectedPartitionNames.size(); i++) {
+ MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedPartitionTypes.get(i), options);
+ MaterializedField field = MaterializedField.create(selectedPartitionNames.get(i), type);
+ Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
+ pVectors.add(output.addField(field, vvClass));
+ }
+ } catch(SchemaChangeException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public abstract int next();
+
+ protected void setValueCountAndPopulatePartitionVectors(int recordCount) {
+ for (ValueVector v : vectors) {
+ v.getMutator().setValueCount(recordCount);
+ }
+
+ if (partition != null) {
+ populatePartitionVectors(recordCount);
+ }
+ }
+
+ protected void readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
+ for (int i = 0; i < selectedStructFieldRefs.size(); i++) {
+ Object hiveValue = finalOI.getStructFieldData(deSerializedValue, selectedStructFieldRefs.get(i));
+ if (hiveValue != null) {
+ selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
+ vectors.get(i), outputRecordIndex);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ closeReader();
+ }
+
+ /**
+ * Will close record reader if any. Any exception will be logged as warning.
+ */
+ private void closeReader() {
+ try {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ } catch (Exception e) {
+ logger.warn("Failure while closing Hive Record reader.", e);
+ }
+ }
+
+ protected void populatePartitionVectors(int recordCount) {
+ for (int i = 0; i < pVectors.size(); i++) {
+ final ValueVector vector = pVectors.get(i);
+ final Object val = selectedPartitionValues.get(i);
+
+ AllocationHelper.allocateNew(vector, recordCount);
+
+ if (val != null) {
+ HiveUtilities.populateVector(vector, managedBuffer, val, 0, recordCount);
+ }
+
+ vector.getMutator().setValueCount(recordCount);
+ }
+ }
+
+ /**
+ * Writes value in the given value holder if next value available.
+ * If value is not, checks if there are any other available readers
+ * that may hold next value and tried to obtain value from them.
+ *
+ * @param value value holder
+ * @return true if value was written, false otherwise
+ */
+ protected boolean hasNextValue(Object value) {
+ while (true) {
+ try {
+ if (reader.next(key, value)) {
+ return true;
+ }
+
+ if (initNextReader(job)) {
+ continue;
+ }
+
+ return false;
+
+ } catch (IOException | ExecutionSetupException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/AbstractReadersInitializer.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/AbstractReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/AbstractReadersInitializer.java
new file mode 100644
index 0000000..7f9e0c0
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/AbstractReadersInitializer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.initilializers;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.hive.HivePartition;
+import org.apache.drill.exec.store.hive.HiveSubScan;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
+import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Parent class for reader initializers which create reader based on reader class.
+ * Holds common logic how to create reader constructor and reader instance.
+ * Is responsible to ensure each child class implements logic for initializing record reader.
+ */
+public abstract class AbstractReadersInitializer {
+
+ protected final HiveSubScan config;
+
+ private final FragmentContext context;
+ private final Class<? extends HiveAbstractReader> readerClass;
+ private final UserGroupInformation proxyUgi;
+
+ public AbstractReadersInitializer(FragmentContext context, HiveSubScan config, Class<? extends HiveAbstractReader> readerClass) {
+ this.config = config;
+ this.context = context;
+ this.readerClass = readerClass;
+ this.proxyUgi = ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName());
+ }
+
+ protected Constructor<? extends HiveAbstractReader> createReaderConstructor() {
+ try {
+ return readerClass.getConstructor(HiveTableWithColumnCache.class, HivePartition.class,
+ Collection.class,
+ List.class, FragmentContext.class, HiveConf.class, UserGroupInformation.class);
+ } catch (ReflectiveOperationException e) {
+ throw new DrillRuntimeException(String.format("Unable to retrieve constructor for Hive reader class [%s]", readerClass), e);
+ }
+ }
+
+ protected HiveAbstractReader createReader(Constructor<? extends HiveAbstractReader> readerConstructor, Partition partition, Object split) {
+ try {
+ return readerConstructor.newInstance(config.getTable(), partition, split, config.getColumns(), context, config.getHiveConf(), proxyUgi);
+ } catch (ReflectiveOperationException e) {
+ throw new DrillRuntimeException(String.format("Unable to create instance for Hive reader [%s]", readerConstructor), e);
+ }
+ }
+
+ /**
+ * @return list of initialized records readers
+ */
+ public abstract List<RecordReader> init();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
new file mode 100644
index 0000000..a161151
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.initilializers;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.hive.HivePartition;
+import org.apache.drill.exec.store.hive.HiveSubScan;
+import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
+import org.apache.drill.exec.store.hive.readers.initilializers.AbstractReadersInitializer;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Creates separate record reader for each given input split group.
+ */
+public class DefaultReadersInitializer extends AbstractReadersInitializer {
+
+ public DefaultReadersInitializer(FragmentContext context, HiveSubScan config, Class<? extends HiveAbstractReader> readerClass) {
+ super(context, config, readerClass);
+ }
+
+ @Override
+ public List<RecordReader> init() {
+ List<List<InputSplit>> inputSplits = config.getInputSplits();
+ List<HivePartition> partitions = config.getPartitions();
+ boolean hasPartitions = partitions != null && !partitions.isEmpty();
+
+ List<RecordReader> readers = new ArrayList<>(inputSplits.size());
+ Constructor<? extends HiveAbstractReader> readerConstructor = createReaderConstructor();
+ for (int i = 0 ; i < inputSplits.size(); i++) {
+ readers.add(createReader(readerConstructor, hasPartitions ? partitions.get(i) : null, inputSplits.get(i)));
+ }
+ return readers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/EmptyReadersInitializer.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/EmptyReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/EmptyReadersInitializer.java
new file mode 100644
index 0000000..cb29f19
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/EmptyReadersInitializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.initilializers;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.hive.HiveSubScan;
+import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
+import org.apache.drill.exec.store.hive.readers.initilializers.AbstractReadersInitializer;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * If table is empty creates an empty record reader to output the schema.
+ */
+public class EmptyReadersInitializer extends AbstractReadersInitializer {
+
+ public EmptyReadersInitializer(FragmentContext context, HiveSubScan config, Class<? extends HiveAbstractReader> readerClass) {
+ super(context, config, readerClass);
+ }
+
+ @Override
+ public List<RecordReader> init() {
+ List<RecordReader> readers = new ArrayList<>(1);
+ Constructor<? extends HiveAbstractReader> readerConstructor = createReaderConstructor();
+ readers.add(createReader(readerConstructor, null, null));
+ return readers;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/ReadersInitializer.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/ReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/ReadersInitializer.java
new file mode 100644
index 0000000..78aaf42
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/ReadersInitializer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.initilializers;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.hive.HiveSubScan;
+import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
+import org.apache.drill.exec.store.hive.readers.HiveAvroReader;
+import org.apache.drill.exec.store.hive.readers.HiveDefaultReader;
+import org.apache.drill.exec.store.hive.readers.HiveOrcReader;
+import org.apache.drill.exec.store.hive.readers.HiveParquetReader;
+import org.apache.drill.exec.store.hive.readers.HiveRCFileReader;
+import org.apache.drill.exec.store.hive.readers.HiveTextReader;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ReadersInitializer {
+
+ /**
+ * List of all available readers classes for a different Hive nativ formats:
+ * ORC, AVRO, RCFFile, Text and Parquet.
+ */
+ private static final Map<String, Class<? extends HiveAbstractReader>> READER_MAP = new HashMap<>();
+
+ static {
+ READER_MAP.put(OrcInputFormat.class.getCanonicalName(), HiveOrcReader.class);
+ READER_MAP.put(AvroContainerInputFormat.class.getCanonicalName(), HiveAvroReader.class);
+ READER_MAP.put(RCFileInputFormat.class.getCanonicalName(), HiveRCFileReader.class);
+ READER_MAP.put(MapredParquetInputFormat.class.getCanonicalName(), HiveParquetReader.class);
+ READER_MAP.put(TextInputFormat.class.getCanonicalName(), HiveTextReader.class);
+ }
+
+ /**
+ * Determines which reader initializer should be used got given table configuration.
+ * Decision is made based on table content and skip header / footer logic usage.
+ *
+ * @param context fragment context
+ * @param config Hive table config
+ * @return reader initializer
+ */
+ public static AbstractReadersInitializer getInitializer(FragmentContext context, HiveSubScan config) {
+ Class<? extends HiveAbstractReader> readerClass = getReaderClass(config);
+ if (config.getInputSplits().isEmpty()) {
+ return new EmptyReadersInitializer(context, config, readerClass);
+ } else {
+ return new DefaultReadersInitializer(context, config, readerClass);
+ }
+ }
+
+ /**
+ * Will try to find reader class based on Hive table input format.
+ * If reader class was not find, will use default reader class.
+ *
+ * @param config Hive table config
+ * @return reader class
+ */
+ private static Class<? extends HiveAbstractReader> getReaderClass(HiveSubScan config) {
+ final String formatName = config.getTable().getSd().getInputFormat();
+ Class<? extends HiveAbstractReader> readerClass = HiveDefaultReader.class;
+ if (READER_MAP.containsKey(formatName)) {
+ readerClass = READER_MAP.get(formatName);
+ }
+ return readerClass;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/AbstractRecordsInspector.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/AbstractRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/AbstractRecordsInspector.java
new file mode 100644
index 0000000..026c3d1
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/AbstractRecordsInspector.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.inspectors;
+
+import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
+
+/**
+ * Parent class for records inspectors which responsible for counting of processed records
+ * and managing free and used value holders.
+ */
+public abstract class AbstractRecordsInspector {
+
+ private int processedRecordCount;
+
+ /**
+ * Checks if current number of processed records does not exceed max batch size.
+ *
+ * @return true if reached max number of records in batch
+ */
+ public boolean isBatchFull() {
+ return processedRecordCount >= HiveAbstractReader.TARGET_RECORD_COUNT;
+ }
+
+ /**
+ * @return number of processed records
+ */
+ public int getProcessedRecordCount() {
+ return processedRecordCount;
+ }
+
+ /**
+ * Increments current number of processed records.
+ */
+ public void incrementProcessedRecordCount() {
+ processedRecordCount++;
+ }
+
+ /**
+ * When batch of data was sent, number of processed records should be reset.
+ */
+ public void reset() {
+ processedRecordCount = 0;
+ }
+
+ /**
+ * Returns value holder where next value will be written.
+ *
+ * @return value holder
+ */
+ public abstract Object getValueHolder();
+
+ /**
+ * @return value holder with written value
+ */
+ public abstract Object getNextValue();
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
new file mode 100644
index 0000000..2ffa64a
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.inspectors;
+
+/**
+ * Default records inspector that uses the same value holder for each record.
+ * Each value once written is immediately processed thus value holder can be re-used.
+ */
+public class DefaultRecordsInspector extends AbstractRecordsInspector {
+
+ private final Object value;
+
+ public DefaultRecordsInspector(Object value) {
+ this.value = value;
+ }
+
+ @Override
+ public Object getValueHolder() {
+ return value;
+ }
+
+ @Override
+ public Object getNextValue() {
+ return value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/SkipFooterRecordsInspector.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/SkipFooterRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/SkipFooterRecordsInspector.java
new file mode 100644
index 0000000..eee4df0
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/SkipFooterRecordsInspector.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.inspectors;
+
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * To implement skip footer logic this records inspector will buffer N number of incoming read records in queue
+ * and make sure they are skipped when input is fully processed. FIFO method of queuing is used for these purposes.
+ */
+public class SkipFooterRecordsInspector extends AbstractRecordsInspector {
+
+ private final int footerCount;
+ private Queue<Object> footerBuffer;
+ private final List<Object> valueHolders;
+ private long readRecordsCount;
+
+ public SkipFooterRecordsInspector(RecordReader<Object, Object> reader, int footerCount) {
+ this.footerCount = footerCount;
+ this.footerBuffer = new LinkedList<>();
+ this.valueHolders = initializeValueHolders(reader, footerCount);
+ }
+
+ /**
+ * Returns next available value holder where value should be written from the cached value holders.
+ * Current available holder is determined by getting mod for actually read records.
+ *
+ * @return value holder
+ */
+ @Override
+ public Object getValueHolder() {
+ int availableHolderIndex = (int) readRecordsCount % valueHolders.size();
+ return valueHolders.get(availableHolderIndex);
+ }
+
+ /**
+ * Buffers current value holder with written value
+ * and returns last buffered value if number of buffered values exceeds N records to skip.
+ *
+ * @return next available value holder with written value, null otherwise
+ */
+ @Override
+ public Object getNextValue() {
+ footerBuffer.add(getValueHolder());
+ readRecordsCount++;
+ if (footerBuffer.size() <= footerCount) {
+ return null;
+ }
+ return footerBuffer.poll();
+ }
+
+ /**
+ * Creates buffer of value holders, so these holders can be re-used.
+ * Holders quantity depends on number of lines to skip in the end of the file plus one.
+ *
+ * @param reader record reader
+ * @param footerCount number of lines to skip at the end of the file
+ * @return list of value holders
+ */
+ private List<Object> initializeValueHolders(RecordReader<Object, Object> reader, int footerCount) {
+ List<Object> valueHolder = new ArrayList<>(footerCount + 1);
+ for (int i = 0; i <= footerCount; i++) {
+ valueHolder.add(reader.createValue());
+ }
+ return valueHolder;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 33b8ec0..c07c9d8 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -492,33 +492,43 @@ public class TestHiveStorage extends HiveTestBase {
}
}
- @Test // DRILL-3688
- public void testIgnoreSkipHeaderFooterForRcfile() throws Exception {
+ @Test
+ public void testTableWithHeaderOnly() throws Exception {
testBuilder()
- .sqlQuery("select count(1) as cnt from hive.skipper.kv_rcfile_large")
+ .sqlQuery("select count(1) as cnt from hive.skipper.kv_text_header_only")
.unOrdered()
.baselineColumns("cnt")
- .baselineValues(5000L)
+ .baselineValues(0L)
.go();
}
- @Test // DRILL-3688
- public void testIgnoreSkipHeaderFooterForParquet() throws Exception {
+ @Test
+ public void testTableWithFooterOnly() throws Exception {
testBuilder()
- .sqlQuery("select count(1) as cnt from hive.skipper.kv_parquet_large")
+ .sqlQuery("select count(1) as cnt from hive.skipper.kv_text_footer_only")
.unOrdered()
.baselineColumns("cnt")
- .baselineValues(5000L)
+ .baselineValues(0L)
.go();
}
- @Test // DRILL-3688
- public void testIgnoreSkipHeaderFooterForSequencefile() throws Exception {
+ @Test
+ public void testTableWithHeaderFooterOnly() throws Exception {
testBuilder()
- .sqlQuery("select count(1) as cnt from hive.skipper.kv_sequencefile_large")
+ .sqlQuery("select count(1) as cnt from hive.skipper.kv_text_header_footer_only")
.unOrdered()
.baselineColumns("cnt")
- .baselineValues(5000L)
+ .baselineValues(0L)
+ .go();
+ }
+
+ @Test
+ public void testSkipHeaderFooterForPartitionedTable() throws Exception {
+ testBuilder()
+ .sqlQuery("select count(1) as cnt from hive.skipper.kv_text_with_part")
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues(4980L)
.go();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index 8e5c77b..3e06316 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -67,9 +67,10 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
.baselineValues("hive.skipper", "kv_text_large")
.baselineValues("hive.skipper", "kv_incorrect_skip_header")
.baselineValues("hive.skipper", "kv_incorrect_skip_footer")
- .baselineValues("hive.skipper", "kv_rcfile_large")
- .baselineValues("hive.skipper", "kv_parquet_large")
- .baselineValues("hive.skipper", "kv_sequencefile_large")
+ .baselineValues("hive.skipper", "kv_text_header_only")
+ .baselineValues("hive.skipper", "kv_text_footer_only")
+ .baselineValues("hive.skipper", "kv_text_header_footer_only")
+ .baselineValues("hive.skipper", "kv_text_with_part")
.go();
}
@@ -255,9 +256,10 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
.baselineValues("DRILL", "hive.skipper", "kv_text_large", "TABLE")
.baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_header", "TABLE")
.baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_footer", "TABLE")
- .baselineValues("DRILL", "hive.skipper", "kv_rcfile_large", "TABLE")
- .baselineValues("DRILL", "hive.skipper", "kv_parquet_large", "TABLE")
- .baselineValues("DRILL", "hive.skipper", "kv_sequencefile_large", "TABLE")
+ .baselineValues("DRILL", "hive.skipper", "kv_text_header_only", "TABLE")
+ .baselineValues("DRILL", "hive.skipper", "kv_text_footer_only", "TABLE")
+ .baselineValues("DRILL", "hive.skipper", "kv_text_header_footer_only", "TABLE")
+ .baselineValues("DRILL", "hive.skipper", "kv_text_with_part", "TABLE")
.go();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 4fafadb..924d7cb 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -518,35 +518,37 @@ public class HiveTestDataGenerator {
// Create text tables with skip header and footer table property
executeQuery(hiveDriver, "create database if not exists skipper");
- executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_small", "textfile", "1", "1"));
+ executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_small", "textfile", "1", "1", false));
executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_text_small", 5, 1, 1));
- executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_large", "textfile", "2", "2"));
+ executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_large", "textfile", "2", "2", false));
executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_text_large", 5000, 2, 2));
- executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_incorrect_skip_header", "textfile", "A", "1"));
+ executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_incorrect_skip_header", "textfile", "A", "1", false));
executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_incorrect_skip_header", 5, 1, 1));
- executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_incorrect_skip_footer", "textfile", "1", "A"));
+ executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_incorrect_skip_footer", "textfile", "1", "A", false));
executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_incorrect_skip_footer", 5, 1, 1));
- // Create rcfile table with skip header and footer table property
- executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_rcfile_large", "rcfile", "1", "1"));
- executeQuery(hiveDriver, "insert into table skipper.kv_rcfile_large select * from skipper.kv_text_large");
+ executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_header_only", "textfile", "5", "0", false));
+ executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_text_header_only", 0, 5, 0));
- // Create parquet table with skip header and footer table property
- executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_parquet_large", "parquet", "1", "1"));
- executeQuery(hiveDriver, "insert into table skipper.kv_parquet_large select * from skipper.kv_text_large");
+ executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_footer_only", "textfile", "0", "5", false));
+ executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_text_footer_only", 0, 0, 5));
- // Create sequencefile table with skip header and footer table property
- executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_sequencefile_large", "sequencefile", "1", "1"));
- executeQuery(hiveDriver, "insert into table skipper.kv_sequencefile_large select * from skipper.kv_text_large");
+ executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_header_footer_only", "textfile", "5", "5", false));
+ executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_text_header_footer_only", 0, 5, 5));
+
+ executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_with_part", "textfile", "5", "5", true));
+ executeQuery(hiveDriver, "insert overwrite table skipper.kv_text_with_part partition (part) " +
+ "select key, value, key % 2 as part from skipper.kv_text_large");
+
+ // Create a table based on json file
+ executeQuery(hiveDriver, "create table default.simple_json(json string)");
+ final String loadData = "load data local inpath '" +
+ Resources.getResource("simple.json") + "' into table default.simple_json";
+ executeQuery(hiveDriver, loadData);
- // Create a table based on json file
- executeQuery(hiveDriver, "create table default.simple_json(json string)");
- final String loadData = String.format("load data local inpath '" +
- Resources.getResource("simple.json") + "' into table default.simple_json");
- executeQuery(hiveDriver, loadData);
ss.close();
}
@@ -604,23 +606,41 @@ public class HiveTestDataGenerator {
return file.getPath();
}
- private String createTableWithHeaderFooterProperties(String tableName, String format, String headerValue, String footerValue) {
- return String.format("create table %s (key int, value string) stored as %s tblproperties('%s'='%s', '%s'='%s')",
- tableName, format, serdeConstants.HEADER_COUNT, headerValue, serdeConstants.FOOTER_COUNT, footerValue);
+ private String createTableWithHeaderFooterProperties(String tableName,
+ String format,
+ String headerValue,
+ String footerValue,
+ boolean hasPartitions) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("create table ").append(tableName);
+ sb.append(" (key int, value string) ");
+ if (hasPartitions) {
+ sb.append("partitioned by (part bigint) ");
+ }
+ sb.append(" stored as ").append(format);
+ sb.append(" tblproperties(");
+ sb.append("'").append(serdeConstants.HEADER_COUNT).append("'='").append(headerValue).append("'");
+ sb.append(",");
+ sb.append("'").append(serdeConstants.FOOTER_COUNT).append("'='").append(footerValue).append("'");
+ sb.append(")");
+
+ return sb.toString();
}
private String generateTestDataWithHeadersAndFooters(String tableName, int rowCount, int headerLines, int footerLines) {
StringBuilder sb = new StringBuilder();
sb.append("insert into table ").append(tableName).append(" (key, value) values ");
- int length = sb.length();
sb.append(StringUtils.repeat("('key_header', 'value_header')", ",", headerLines));
+ if (headerLines > 0) {
+ sb.append(",");
+ }
for (int i = 1; i <= rowCount; i++) {
- sb.append(",(").append(i).append(",").append("'key_").append(i).append("')");
+ sb.append("(").append(i).append(",").append("'key_").append(i).append("'),");
}
- if (headerLines <= 0) {
- sb.deleteCharAt(length);
+ if (footerLines <= 0) {
+ sb.deleteCharAt(sb.length() - 1);
}
- sb.append(StringUtils.repeat(",('key_footer', 'value_footer')", footerLines));
+ sb.append(StringUtils.repeat("('key_footer', 'value_footer')", ",", footerLines));
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/inspectors/SkipFooterRecordsInspectorTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/inspectors/SkipFooterRecordsInspectorTest.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/inspectors/SkipFooterRecordsInspectorTest.java
new file mode 100644
index 0000000..92970ed
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/inspectors/SkipFooterRecordsInspectorTest.java
@@ -0,0 +1,84 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec.store.hive.inspectors;
+
+import org.apache.drill.exec.store.hive.readers.inspectors.SkipFooterRecordsInspector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SkipFooterRecordsInspectorTest {
+
+ private static RecordReader<Object, Object> recordReader;
+
+ @BeforeClass
+ @SuppressWarnings("unchecked")
+ public static void init() {
+ recordReader = mock(RecordReader.class);
+ when(recordReader.createValue()).thenReturn(new Object());
+ }
+
+ @Test
+ public void testHolderReUsage() {
+ SkipFooterRecordsInspector inspector = new SkipFooterRecordsInspector(recordReader, 1);
+ // store first value holder
+ Object firstHolder = inspector.getValueHolder();
+
+ // return null since one record was buffered as footer
+ assertNull(inspector.getNextValue());
+
+ // store first value holder
+ Object secondHolder = inspector.getValueHolder();
+
+ // return value stored in first holder now second holder is buffering the footer
+ assertEquals(secondHolder, inspector.getValueHolder());
+ assertEquals(firstHolder, inspector.getNextValue());
+
+ // return value stored in second holder, as now first holder is buffering the footer
+ assertEquals(firstHolder, inspector.getValueHolder());
+ assertEquals(secondHolder, inspector.getNextValue());
+ }
+
+ @Test
+ public void testReset() {
+ SkipFooterRecordsInspector inspector = new SkipFooterRecordsInspector(recordReader, 2);
+ assertEquals(0, inspector.getProcessedRecordCount());
+
+ // store second holder
+ inspector.getNextValue();
+ Object secondHolder = inspector.getValueHolder();
+ inspector.getNextValue();
+
+ // process n records and increment count, so we stop at second holder
+ for (int i = 0; i < 4; i++) {
+ inspector.getNextValue();
+ inspector.incrementProcessedRecordCount();
+ }
+ assertEquals(4, inspector.getProcessedRecordCount());
+ assertEquals(secondHolder, inspector.getValueHolder());
+
+ // reset and make sure we start from the last available holder
+ inspector.reset();
+ assertEquals(0, inspector.getProcessedRecordCount());
+ assertEquals(secondHolder, inspector.getValueHolder());
+ }
+}