You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2016/12/20 04:27:35 UTC
[6/8] drill git commit: DRILL-5032: Drill query on hive parquet table
failed with OutOfMemoryError: Java heap space
DRILL-5032: Drill query on hive parquet table failed with OutOfMemoryError: Java heap space
close apache/drill#654
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/03928af0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/03928af0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/03928af0
Branch: refs/heads/master
Commit: 03928af0b5cafd52e5b153aa852e5642b505f2c6
Parents: d8cc710
Author: Serhii-Harnyk <se...@gmail.com>
Authored: Thu Oct 27 19:20:27 2016 +0000
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Dec 19 15:57:36 2016 -0800
----------------------------------------------------------------------
.../codegen/templates/HiveRecordReaders.java | 4 +-
.../planner/sql/HivePartitionDescriptor.java | 12 +-
...onvertHiveParquetScanToDrillParquetScan.java | 14 +-
.../drill/exec/store/hive/ColumnListsCache.java | 95 ++++
.../store/hive/DrillHiveMetaStoreClient.java | 40 +-
.../exec/store/hive/HiveAbstractReader.java | 15 +-
.../store/hive/HiveDrillNativeParquetScan.java | 4 +-
.../hive/HiveDrillNativeScanBatchCreator.java | 6 +-
.../exec/store/hive/HiveMetadataProvider.java | 11 +-
.../drill/exec/store/hive/HivePartition.java | 61 +++
.../drill/exec/store/hive/HiveReadEntry.java | 31 +-
.../apache/drill/exec/store/hive/HiveScan.java | 17 +-
.../exec/store/hive/HiveScanBatchCreator.java | 8 +-
.../drill/exec/store/hive/HiveSubScan.java | 15 +-
.../apache/drill/exec/store/hive/HiveTable.java | 382 ---------------
.../store/hive/HiveTableWithColumnCache.java | 76 +++
.../drill/exec/store/hive/HiveTableWrapper.java | 466 +++++++++++++++++++
.../drill/exec/store/hive/HiveUtilities.java | 41 +-
.../exec/store/hive/schema/DrillHiveTable.java | 7 +-
.../drill/exec/TestHivePartitionPruning.java | 29 +-
.../exec/hive/TestInfoSchemaOnHiveStorage.java | 2 +
.../exec/store/hive/HiveTestDataGenerator.java | 9 +
.../store/hive/schema/TestColumnListCache.java | 111 +++++
23 files changed, 980 insertions(+), 476 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
index 0dc8c08..7d6733e 100644
--- a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
+++ b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
@@ -43,8 +43,6 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.security.UserGroupInformation;
@@ -75,7 +73,7 @@ public class Hive${entry.hiveReader}Reader extends HiveAbstractReader {
Object value;
</#if>
- public Hive${entry.hiveReader}Reader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
+ public Hive${entry.hiveReader}Reader(HiveTableWithColumnCache table, HivePartition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
FragmentContext context, final HiveConf hiveConf,
UserGroupInformation proxyUgi) throws ExecutionSetupException {
super(table, partition, inputSplit, projectedColumns, context, hiveConf, proxyUgi);
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index d42aea7..b22c14d 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -31,10 +31,10 @@ import org.apache.drill.exec.planner.PartitionLocation;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.store.hive.HiveTableWrapper;
import org.apache.drill.exec.store.hive.HiveUtilities;
import org.apache.drill.exec.store.hive.HiveReadEntry;
import org.apache.drill.exec.store.hive.HiveScan;
-import org.apache.drill.exec.store.hive.HiveTable;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.hive.metastore.api.Partition;
@@ -64,7 +64,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
this.scanRel = scanRel;
this.managedBuffer = managedBuffer.reallocIfNeeded(256);
this.defaultPartitionValue = defaultPartitionValue;
- for (HiveTable.FieldSchemaWrapper wrapper : ((HiveScan) scanRel.getGroupScan()).hiveReadEntry.table.partitionKeys) {
+ for (HiveTableWrapper.FieldSchemaWrapper wrapper : ((HiveScan) scanRel.getGroupScan()).hiveReadEntry.table.partitionKeys) {
partitionMap.put(wrapper.name, i);
i++;
}
@@ -115,7 +115,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
}
for(ValueVector v : vectors) {
- if(v == null){
+ if (v == null) {
continue;
}
v.getMutator().setValueCount(partitions.size());
@@ -166,10 +166,10 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
private GroupScan createNewGroupScan(List<PartitionLocation> newPartitionLocations) throws ExecutionSetupException {
HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
- List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions;
- List<HiveTable.HivePartition> newPartitions = new LinkedList<>();
+ List<HiveTableWrapper.HivePartitionWrapper> oldPartitions = origReadEntry.partitions;
+ List<HiveTableWrapper.HivePartitionWrapper> newPartitions = Lists.newLinkedList();
- for (HiveTable.HivePartition part: oldPartitions) {
+ for (HiveTableWrapper.HivePartitionWrapper part: oldPartitions) {
String partitionLocation = part.getPartition().getSd().getLocation();
for (PartitionLocation newPartitionLocation: newPartitionLocations) {
if (partitionLocation.equals(newPartitionLocation.getEntirePartitionLocation())) {
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index 228308f..df85ca0 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -39,10 +39,10 @@ import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan;
import org.apache.drill.exec.store.hive.HiveReadEntry;
import org.apache.drill.exec.store.hive.HiveScan;
-import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
+import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
import org.apache.drill.exec.store.hive.HiveUtilities;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -97,23 +97,23 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
final HiveConf hiveConf = hiveScan.getHiveConf();
- final Table hiveTable = hiveScan.hiveReadEntry.getTable();
+ final HiveTableWithColumnCache hiveTable = hiveScan.hiveReadEntry.getTable();
final Class<? extends InputFormat<?,?>> tableInputFormat =
- getInputFormatFromSD(MetaStoreUtils.getTableMetadata(hiveTable), hiveScan.hiveReadEntry, hiveTable.getSd(),
+ getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), hiveScan.hiveReadEntry, hiveTable.getSd(),
hiveConf);
if (tableInputFormat == null || !tableInputFormat.equals(MapredParquetInputFormat.class)) {
return false;
}
- final List<HivePartition> partitions = hiveScan.hiveReadEntry.getHivePartitionWrappers();
+ final List<HivePartitionWrapper> partitions = hiveScan.hiveReadEntry.getHivePartitionWrappers();
if (partitions == null) {
return true;
}
final List<FieldSchema> tableSchema = hiveTable.getSd().getCols();
// Make sure all partitions have the same input format as the table input format
- for (HivePartition partition : partitions) {
+ for (HivePartitionWrapper partition : partitions) {
final StorageDescriptor partitionSD = partition.getPartition().getSd();
Class<? extends InputFormat<?, ?>> inputFormat = getInputFormatFromSD(
HiveUtilities.getPartitionMetadata(partition.getPartition(), hiveTable), hiveScan.hiveReadEntry, partitionSD,
@@ -179,7 +179,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
getPartitionColMapping(hiveTable, partitionColumnLabel);
final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel);
- if(hiveScanRel.getRowType().getFieldCount() == 0) {
+ if (hiveScanRel.getRowType().getFieldCount() == 0) {
call.transformTo(nativeScanRel);
} else {
final DrillProjectRel projectRel = createProjectRel(hiveScanRel, partitionColMapping, nativeScanRel);
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/ColumnListsCache.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/ColumnListsCache.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/ColumnListsCache.java
new file mode 100644
index 0000000..ae4baa1
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/ColumnListsCache.java
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The class represents "cache" for partition and table columns.
+ * Used to reduce physical plan for Hive tables.
+ * Only unique partition lists of columns stored in the column lists cache.
+ * Table columns should be stored at index 0.
+ */
+public class ColumnListsCache {
+ // contains immutable column lists
+ private final List<List<FieldSchema>> fields;
+
+ // keys of the map are column lists and values are them positions in list fields
+ private final Map<List<FieldSchema>, Integer> keys;
+
+ public ColumnListsCache(Table table) {
+ this();
+ // table columns stored at index 0.
+ addOrGet(table.getSd().getCols());
+ }
+
+ public ColumnListsCache() {
+ this.fields = Lists.newArrayList();
+ this.keys = Maps.newHashMap();
+ }
+
+ /**
+ * Checks if column list has been added before and returns position of column list.
+ * If list is unique, adds list to the fields list and returns it position.
+ * Returns -1, if {@param columns} equals null.
+ *
+ * @param columns list of columns
+ * @return index of {@param columns} or -1, if {@param columns} equals null
+ */
+ public int addOrGet(List<FieldSchema> columns) {
+ if (columns == null) {
+ return -1;
+ }
+ Integer index = keys.get(columns);
+ if (index != null) {
+ return index;
+ } else {
+ index = fields.size();
+ final List<FieldSchema> immutableList = ImmutableList.copyOf(columns);
+ fields.add(immutableList);
+ keys.put(immutableList, index);
+ return index;
+ }
+ }
+
+ /**
+ * Returns list of columns at the specified position in fields list,
+ * or null if index is negative or greater than fields list size.
+ *
+ * @param index index of column list to return
+ * @return list of columns at the specified position in fields list
+ * or null if index is negative or greater than fields list size
+ */
+ public List<FieldSchema> getColumns(int index) {
+ if (index >= 0 && index < fields.size()) {
+ return fields.get(index);
+ } else {
+ return null;
+ }
+ }
+
+ public List<List<FieldSchema>> getFields() {
+ return Lists.newArrayList(fields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java
index d7ba659..92efdc7 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java
@@ -23,8 +23,6 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import org.apache.calcite.schema.Schema;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.util.ImpersonationUtil;
@@ -270,9 +268,9 @@ public abstract class DrillHiveMetaStoreClient extends HiveMetaStoreClient {
/** Helper method which gets table metadata. Retries once if the first call to fetch the metadata fails */
protected static HiveReadEntry getHiveReadEntryHelper(final IMetaStoreClient mClient, final String dbName,
final String tableName) throws TException {
- Table t = null;
+ Table table = null;
try {
- t = mClient.getTable(dbName, tableName);
+ table = mClient.getTable(dbName, tableName);
} catch (MetaException | NoSuchObjectException e) {
throw e;
} catch (TException e) {
@@ -283,10 +281,10 @@ public abstract class DrillHiveMetaStoreClient extends HiveMetaStoreClient {
logger.warn("Failure while attempting to close existing hive metastore connection. May leak connection.", ex);
}
mClient.reconnect();
- t = mClient.getTable(dbName, tableName);
+ table = mClient.getTable(dbName, tableName);
}
- if (t == null) {
+ if (table == null) {
throw new UnknownTableException(String.format("Unable to find table '%s'.", tableName));
}
@@ -306,16 +304,34 @@ public abstract class DrillHiveMetaStoreClient extends HiveMetaStoreClient {
partitions = mClient.listPartitions(dbName, tableName, (short) -1);
}
- List<HiveTable.HivePartition> hivePartitions = Lists.newArrayList();
- for (Partition part : partitions) {
- hivePartitions.add(new HiveTable.HivePartition(part));
+ List<HiveTableWrapper.HivePartitionWrapper> hivePartitionWrappers = Lists.newArrayList();
+ HiveTableWithColumnCache hiveTable = new HiveTableWithColumnCache(table, new ColumnListsCache(table));
+ for (Partition partition : partitions) {
+ hivePartitionWrappers.add(createPartitionWithSpecColumns(hiveTable, partition));
}
- if (hivePartitions.size() == 0) {
- hivePartitions = null;
+ if (hivePartitionWrappers.isEmpty()) {
+ hivePartitionWrappers = null;
}
- return new HiveReadEntry(new HiveTable(t), hivePartitions);
+ return new HiveReadEntry(new HiveTableWrapper(hiveTable), hivePartitionWrappers);
+ }
+
+ /**
+ * Helper method which stores partition columns in table columnListCache. If table columnListCache has exactly the
+ * same columns as partition, in partition stores columns index that corresponds to identical column list.
+ * If table columnListCache hasn't such column list, the column list adds to table columnListCache and in partition
+ * stores columns index that corresponds to column list.
+ *
+ * @param table hive table instance
+ * @param partition partition instance
+ * @return hive partition wrapper
+ */
+ public static HiveTableWrapper.HivePartitionWrapper createPartitionWithSpecColumns(HiveTableWithColumnCache table, Partition partition) {
+ int listIndex = table.getColumnListsCache().addOrGet(partition.getSd().getCols());
+ HivePartition hivePartition = new HivePartition(partition, listIndex);
+ HiveTableWrapper.HivePartitionWrapper hivePartitionWrapper = new HiveTableWrapper.HivePartitionWrapper(hivePartition);
+ return hivePartitionWrapper;
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
index 107fc66..8c6df84 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
@@ -39,10 +39,7 @@ import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -67,8 +64,8 @@ public abstract class HiveAbstractReader extends AbstractRecordReader {
protected final DrillBuf managedBuffer;
- protected Table table;
- protected Partition partition;
+ protected HiveTableWithColumnCache table;
+ protected HivePartition partition;
protected InputSplit inputSplit;
protected List<String> selectedColumnNames;
protected List<StructField> selectedStructFieldRefs = Lists.newArrayList();
@@ -106,9 +103,9 @@ public abstract class HiveAbstractReader extends AbstractRecordReader {
protected static final int TARGET_RECORD_COUNT = 4000;
- public HiveAbstractReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
- FragmentContext context, final HiveConf hiveConf,
- UserGroupInformation proxyUgi) throws ExecutionSetupException {
+ public HiveAbstractReader(HiveTableWithColumnCache table, HivePartition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
+ FragmentContext context, final HiveConf hiveConf,
+ UserGroupInformation proxyUgi) throws ExecutionSetupException {
this.table = table;
this.partition = partition;
this.inputSplit = inputSplit;
@@ -130,7 +127,7 @@ public abstract class HiveAbstractReader extends AbstractRecordReader {
Properties tableProperties;
try {
- tableProperties = MetaStoreUtils.getTableMetadata(table);
+ tableProperties = HiveUtilities.getTableMetadata(table);
final Properties partitionProperties =
(partition == null) ? tableProperties :
HiveUtilities.getPartitionMetadata(partition, table);
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 17cae22..ccec61a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
+import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
import java.io.IOException;
import java.util.List;
@@ -103,7 +103,7 @@ public class HiveDrillNativeParquetScan extends HiveScan {
@Override
public String toString() {
- final List<HivePartition> partitions = hiveReadEntry.getHivePartitionWrappers();
+ final List<HivePartitionWrapper> partitions = hiveReadEntry.getHivePartitionWrappers();
int numPartitions = partitions == null ? 0 : partitions.size();
return "HiveDrillNativeParquetScan [table=" + hiveReadEntry.getHiveTableWrapper()
+ ", columns=" + columns
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 42db5d0..66f41e2 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -42,8 +42,6 @@ import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
@@ -62,9 +60,9 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
@Override
public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
- final Table table = config.getTable();
+ final HiveTableWithColumnCache table = config.getTable();
final List<InputSplit> splits = config.getInputSplits();
- final List<Partition> partitions = config.getPartitions();
+ final List<HivePartition> partitions = config.getPartitions();
final List<SchemaPath> columns = config.getColumns();
final String partitionDesignator = context.getOptions()
.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
index 49f7689..e80b37b 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
@@ -83,7 +82,7 @@ public class HiveMetadataProvider {
public HiveStats getStats(final HiveReadEntry hiveReadEntry) throws IOException {
final Stopwatch timeGetStats = Stopwatch.createStarted();
- final Table table = hiveReadEntry.getTable();
+ final HiveTableWithColumnCache table = hiveReadEntry.getTable();
try {
if (!isPartitionedTable) {
final Properties properties = MetaStoreUtils.getTableMetadata(table);
@@ -96,7 +95,7 @@ public class HiveMetadataProvider {
return getStatsEstimateFromInputSplits(getTableInputSplits());
} else {
final HiveStats aggStats = new HiveStats(0, 0);
- for(Partition partition : hiveReadEntry.getPartitions()) {
+ for(HivePartition partition : hiveReadEntry.getPartitions()) {
final Properties properties = HiveUtilities.getPartitionMetadata(partition, table);
HiveStats stats = getStatsFromProps(properties);
@@ -124,7 +123,7 @@ public class HiveMetadataProvider {
return tableInputSplits;
}
- final Properties properties = MetaStoreUtils.getTableMetadata(hiveReadEntry.getTable());
+ final Properties properties = HiveUtilities.getTableMetadata(hiveReadEntry.getTable());
tableInputSplits = splitInputWithUGI(properties, hiveReadEntry.getTable().getSd(), null);
return tableInputSplits;
@@ -133,7 +132,7 @@ public class HiveMetadataProvider {
/** Helper method which returns the InputSplits for given partition. InputSplits are cached to speed up subsequent
* metadata cache requests for the same partition(s).
*/
- private List<InputSplitWrapper> getPartitionInputSplits(final Partition partition) throws Exception {
+ private List<InputSplitWrapper> getPartitionInputSplits(final HivePartition partition) throws Exception {
if (partitionInputSplitMap.containsKey(partition)) {
return partitionInputSplitMap.get(partition);
}
@@ -161,7 +160,7 @@ public class HiveMetadataProvider {
}
final List<InputSplitWrapper> splits = Lists.newArrayList();
- for (Partition p : hiveReadEntry.getPartitions()) {
+ for (HivePartition p : hiveReadEntry.getPartitions()) {
splits.addAll(getPartitionInputSplits(p));
}
return splits;
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartition.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartition.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartition.java
new file mode 100644
index 0000000..ad539b1
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartition.java
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec.store.hive;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is wrapper of {@link Partition} class and used for
+ * storage of such additional information as index of list in column lists cache.
+ */
+public class HivePartition extends Partition {
+ // index of partition column list in the table's column list cache
+ private int columnListIndex;
+
+ public HivePartition(
+ List<String> values,
+ String dbName,
+ String tableName,
+ int createTime,
+ int lastAccessTime,
+ StorageDescriptor sd,
+ Map<String,String> parameters,
+ int columnListIndex)
+ {
+ super(values, dbName, tableName, createTime, lastAccessTime, sd, parameters);
+ this.columnListIndex = columnListIndex;
+ }
+
+ public HivePartition(Partition other, int columnListIndex) {
+ super(other);
+ this.columnListIndex = columnListIndex;
+ }
+
+ /**
+ * To reduce physical plan for Hive tables, in partitions does not stored list of columns
+ * but stored index of that list in the table's column list cache.
+ *
+ * @return index of partition column list in the table's column list cache
+ */
+ public int getColumnListIndex() {
+ return columnListIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
index 4df33ec..0cf7433 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
@@ -21,9 +21,7 @@ import java.util.List;
import org.apache.calcite.schema.Schema.TableType;
-import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -33,42 +31,47 @@ import com.google.common.collect.Lists;
public class HiveReadEntry {
@JsonProperty("table")
- public HiveTable table;
+ public HiveTableWrapper table;
@JsonProperty("partitions")
- public List<HiveTable.HivePartition> partitions;
+ public List<HivePartitionWrapper> partitions;
@JsonIgnore
- private List<Partition> partitionsUnwrapped = Lists.newArrayList();
+ private List<HivePartition> partitionsUnwrapped = Lists.newArrayList();
@JsonCreator
- public HiveReadEntry(@JsonProperty("table") HiveTable table,
- @JsonProperty("partitions") List<HiveTable.HivePartition> partitions) {
+ public HiveReadEntry(@JsonProperty("table") HiveTableWrapper table,
+ @JsonProperty("partitions") List<HivePartitionWrapper> partitions) {
this.table = table;
this.partitions = partitions;
if (partitions != null) {
- for(HiveTable.HivePartition part : partitions) {
+ for(HivePartitionWrapper part : partitions) {
partitionsUnwrapped.add(part.getPartition());
}
}
}
@JsonIgnore
- public Table getTable() {
+ public HiveTableWithColumnCache getTable() {
return table.getTable();
}
@JsonIgnore
- public List<Partition> getPartitions() {
+ public HiveTableWrapper getTableWrapper() {
+ return table;
+ }
+
+ @JsonIgnore
+ public List<HivePartition> getPartitions() {
return partitionsUnwrapped;
}
@JsonIgnore
- public HiveTable getHiveTableWrapper() {
+ public HiveTableWrapper getHiveTableWrapper() {
return table;
}
@JsonIgnore
- public List<HivePartition> getHivePartitionWrappers() {
+ public List<HivePartitionWrapper> getHivePartitionWrappers() {
return partitions;
}
@@ -81,7 +84,7 @@ public class HiveReadEntry {
return TableType.TABLE;
}
- public String getPartitionLocation(HiveTable.HivePartition partition) {
+ public String getPartitionLocation(HivePartitionWrapper partition) {
String partitionPath = table.getTable().getSd().getLocation();
for (String value: partition.values) {
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 1a58cbd..c6cc8a2 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -40,9 +40,10 @@ import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.hive.HiveMetadataProvider.HiveStats;
import org.apache.drill.exec.store.hive.HiveMetadataProvider.InputSplitWrapper;
-import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
+import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.InputSplit;
@@ -56,6 +57,8 @@ import com.google.common.collect.Lists;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
+import static org.apache.drill.exec.store.hive.DrillHiveMetaStoreClient.createPartitionWithSpecColumns;
+
@JsonTypeName("hive-scan")
public class HiveScan extends AbstractGroupScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class);
@@ -151,12 +154,14 @@ public class HiveScan extends AbstractGroupScan {
public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
try {
final List<InputSplitWrapper> splits = mappings.get(minorFragmentId);
- List<HivePartition> parts = Lists.newArrayList();
+ List<HivePartitionWrapper> parts = Lists.newArrayList();
final List<String> encodedInputSplits = Lists.newArrayList();
final List<String> splitTypes = Lists.newArrayList();
for (final InputSplitWrapper split : splits) {
- if (split.getPartition() != null) {
- parts.add(new HivePartition(split.getPartition()));
+ final Partition splitPartition = split.getPartition();
+ if (splitPartition != null) {
+ HiveTableWithColumnCache table = hiveReadEntry.getTable();
+ parts.add(createPartitionWithSpecColumns(new HiveTableWithColumnCache(table, new ColumnListsCache(table)), splitPartition));
}
encodedInputSplits.add(serializeInputSplit(split.getSplit()));
@@ -166,7 +171,7 @@ public class HiveScan extends AbstractGroupScan {
parts = null;
}
- final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts);
+ final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.getTableWrapper(), parts);
return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, storagePlugin);
} catch (IOException | ReflectiveOperationException e) {
throw new ExecutionSetupException(e);
@@ -259,7 +264,7 @@ public class HiveScan extends AbstractGroupScan {
@Override
public String toString() {
- List<HivePartition> partitions = hiveReadEntry.getHivePartitionWrappers();
+ List<HivePartitionWrapper> partitions = hiveReadEntry.getHivePartitionWrappers();
int numPartitions = partitions == null ? 0 : partitions.size();
return "HiveScan [table=" + hiveReadEntry.getHiveTableWrapper()
+ ", columns=" + columns
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index 7aece71..47ea323 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -30,8 +30,6 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
@@ -63,9 +61,9 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
public ScanBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
List<RecordReader> readers = Lists.newArrayList();
- Table table = config.getTable();
+ HiveTableWithColumnCache table = config.getTable();
List<InputSplit> splits = config.getInputSplits();
- List<Partition> partitions = config.getPartitions();
+ List<HivePartition> partitions = config.getPartitions();
boolean hasPartitions = (partitions != null && partitions.size() > 0);
int i = 0;
final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(config.getUserName(),
@@ -80,7 +78,7 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
}
Constructor<? extends HiveAbstractReader> readerConstructor = null;
try {
- readerConstructor = readerClass.getConstructor(Table.class, Partition.class,
+ readerConstructor = readerClass.getConstructor(HiveTableWithColumnCache.class, HivePartition.class,
InputSplit.class, List.class, FragmentContext.class, HiveConf.class,
UserGroupInformation.class);
for (InputSplit split : splits) {
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 74b68a6..107188c 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.store.hive;
import java.io.IOException;
import java.lang.reflect.Constructor;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -27,19 +26,13 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import org.apache.commons.codec.binary.Base64;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.InputSplit;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -58,9 +51,9 @@ public class HiveSubScan extends AbstractBase implements SubScan {
@JsonIgnore
protected List<InputSplit> inputSplits = Lists.newArrayList();
@JsonIgnore
- protected Table table;
+ protected HiveTableWithColumnCache table;
@JsonIgnore
- protected List<Partition> partitions;
+ protected List<HivePartition> partitions;
@JsonIgnore
protected HiveStoragePlugin storagePlugin;
@@ -112,11 +105,11 @@ public class HiveSubScan extends AbstractBase implements SubScan {
return splits;
}
- public Table getTable() {
+ public HiveTableWithColumnCache getTable() {
return table;
}
- public List<Partition> getPartitions() {
+ public List<HivePartition> getPartitions() {
return partitions;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
deleted file mode 100644
index b6dd079..0000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Lists;
-
-@JsonTypeName("table")
-public class HiveTable {
-
- @JsonIgnore
- private Table table;
-
- @JsonProperty
- public String tableName;
- @JsonProperty
- public String dbName;
- @JsonProperty
- public String owner;
- @JsonProperty
- public int createTime;
- @JsonProperty
- public int lastAccessTime;
- @JsonProperty
- public int retention;
- @JsonProperty
- public StorageDescriptorWrapper sd;
- @JsonProperty
- public List<FieldSchemaWrapper> partitionKeys;
- @JsonProperty
- public Map<String,String> parameters;
- @JsonProperty
- public String viewOriginalText;
- @JsonProperty
- public String viewExpandedText;
- @JsonProperty
- public String tableType;
-
- @JsonIgnore
- public final Map<String, String> partitionNameTypeMap = new HashMap<>();
-
- @JsonCreator
- public HiveTable(@JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("owner") String owner, @JsonProperty("createTime") int createTime,
- @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("retention") int retention, @JsonProperty("sd") StorageDescriptorWrapper sd,
- @JsonProperty("partitionKeys") List<FieldSchemaWrapper> partitionKeys, @JsonProperty("parameters") Map<String, String> parameters,
- @JsonProperty("viewOriginalText") String viewOriginalText, @JsonProperty("viewExpandedText") String viewExpandedText, @JsonProperty("tableType") String tableType
- ) {
- this.tableName = tableName;
- this.dbName = dbName;
- this.owner = owner;
- this.createTime = createTime;
- this.lastAccessTime = lastAccessTime;
- this.retention = retention;
- this.sd = sd;
- this.partitionKeys = partitionKeys;
- this.parameters = parameters;
- this.viewOriginalText = viewOriginalText;
- this.viewExpandedText = viewExpandedText;
- this.tableType = tableType;
-
- List<FieldSchema> partitionKeysUnwrapped = Lists.newArrayList();
- for (FieldSchemaWrapper w : partitionKeys) {
- partitionKeysUnwrapped.add(w.getFieldSchema());
- partitionNameTypeMap.put(w.name, w.type);
- }
- StorageDescriptor sdUnwrapped = sd.getSd();
- this.table = new Table(tableName, dbName, owner, createTime, lastAccessTime, retention, sdUnwrapped, partitionKeysUnwrapped,
- parameters, viewOriginalText, viewExpandedText, tableType);
- }
-
- public HiveTable(Table table) {
- if (table == null) {
- return;
- }
- this.table = table;
- this.tableName = table.getTableName();
- this.dbName = table.getDbName();
- this.owner = table.getOwner();
- this.createTime = table.getCreateTime();
- this.lastAccessTime = table.getLastAccessTime();
- this.retention = table.getRetention();
- this.sd = new StorageDescriptorWrapper(table.getSd());
- this.partitionKeys = Lists.newArrayList();
- for (FieldSchema f : table.getPartitionKeys()) {
- this.partitionKeys.add(new FieldSchemaWrapper(f));
- partitionNameTypeMap.put(f.getName(), f.getType());
- }
- this.parameters = table.getParameters();
- this.viewOriginalText = table.getViewOriginalText();
- this.viewExpandedText = table.getViewExpandedText();
- this.tableType = table.getTableType();
- }
-
- @JsonIgnore
- public Table getTable() {
- return table;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("Table(");
-
- sb.append("dbName:");
- sb.append(this.dbName);
- sb.append(", ");
-
- sb.append("tableName:");
- sb.append(this.tableName);
- sb.append(")");
-
- return sb.toString();
- }
-
- public static class HivePartition {
-
- @JsonIgnore
- private Partition partition;
-
- @JsonProperty
- public List<String> values;
- @JsonProperty
- public String tableName;
- @JsonProperty
- public String dbName;
- @JsonProperty
- public int createTime;
- @JsonProperty
- public int lastAccessTime;
- @JsonProperty
- public StorageDescriptorWrapper sd;
- @JsonProperty
- public Map<String,String> parameters;
-
- @JsonCreator
- public HivePartition(@JsonProperty("values") List<String> values, @JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("createTime") int createTime,
- @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("sd") StorageDescriptorWrapper sd,
- @JsonProperty("parameters") Map<String, String> parameters
- ) {
- this.values = values;
- this.tableName = tableName;
- this.dbName = dbName;
- this.createTime = createTime;
- this.lastAccessTime = lastAccessTime;
- this.sd = sd;
- this.parameters = parameters;
-
- StorageDescriptor sdUnwrapped = sd.getSd();
- this.partition = new org.apache.hadoop.hive.metastore.api.Partition(values, tableName, dbName, createTime, lastAccessTime, sdUnwrapped, parameters);
- }
-
- public HivePartition(Partition partition) {
- if (partition == null) {
- return;
- }
- this.partition = partition;
- this.values = partition.getValues();
- this.tableName = partition.getTableName();
- this.dbName = partition.getDbName();
- this.createTime = partition.getCreateTime();
- this.lastAccessTime = partition.getLastAccessTime();
- this.sd = new StorageDescriptorWrapper(partition.getSd());
- this.parameters = partition.getParameters();
- }
-
- @JsonIgnore
- public Partition getPartition() {
- return partition;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("Partition(");
- sb.append("values:");
- sb.append(this.values);
- sb.append(")");
- return sb.toString();
- }
- }
-
- public static class StorageDescriptorWrapper {
- @JsonIgnore
- private StorageDescriptor sd;
- @JsonProperty
- public List<FieldSchemaWrapper> cols;
- @JsonProperty
- public String location;
- @JsonProperty
- public String inputFormat;
- @JsonProperty
- public String outputFormat;
- @JsonProperty
- public boolean compressed;
- @JsonProperty
- public int numBuckets;
- @JsonProperty
- public SerDeInfoWrapper serDeInfo;
- // @JsonProperty
-// public List<String> bucketCols;
- @JsonProperty
- public List<OrderWrapper> sortCols;
- @JsonProperty
- public Map<String,String> parameters;
-
- @JsonCreator
- public StorageDescriptorWrapper(@JsonProperty("cols") List<FieldSchemaWrapper> cols, @JsonProperty("location") String location, @JsonProperty("inputFormat") String inputFormat,
- @JsonProperty("outputFormat") String outputFormat, @JsonProperty("compressed") boolean compressed, @JsonProperty("numBuckets") int numBuckets,
- @JsonProperty("serDeInfo") SerDeInfoWrapper serDeInfo, @JsonProperty("sortCols") List<OrderWrapper> sortCols,
- @JsonProperty("parameters") Map<String,String> parameters) {
- this.cols = cols;
- this.location = location;
- this.inputFormat = inputFormat;
- this.outputFormat = outputFormat;
- this.compressed = compressed;
- this.numBuckets = numBuckets;
- this.serDeInfo = serDeInfo;
-// this.bucketCols = bucketCols;
- this.sortCols = sortCols;
- this.parameters = parameters;
- List<FieldSchema> colsUnwrapped = Lists.newArrayList();
- for (FieldSchemaWrapper w: cols) {
- colsUnwrapped.add(w.getFieldSchema());
- }
- SerDeInfo serDeInfoUnwrapped = serDeInfo.getSerDeInfo();
- List<Order> sortColsUnwrapped = Lists.newArrayList();
- for (OrderWrapper w : sortCols) {
- sortColsUnwrapped.add(w.getOrder());
- }
-// this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped,
-// bucketCols, sortColsUnwrapped, parameters);
- this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped,
- null, sortColsUnwrapped, parameters);
- }
-
- public StorageDescriptorWrapper(StorageDescriptor sd) {
- this.sd = sd;
- this.cols = Lists.newArrayList();
- for (FieldSchema f : sd.getCols()) {
- this.cols.add(new FieldSchemaWrapper(f));
- }
- this.location = sd.getLocation();
- this.inputFormat = sd.getInputFormat();
- this.outputFormat = sd.getOutputFormat();
- this.compressed = sd.isCompressed();
- this.numBuckets = sd.getNumBuckets();
- this.serDeInfo = new SerDeInfoWrapper(sd.getSerdeInfo());
-// this.bucketCols = sd.getBucketCols();
- this.sortCols = Lists.newArrayList();
- for (Order o : sd.getSortCols()) {
- this.sortCols.add(new OrderWrapper(o));
- }
- this.parameters = sd.getParameters();
- }
-
- @JsonIgnore
- public StorageDescriptor getSd() {
- return sd;
- }
-
- }
-
- public static class SerDeInfoWrapper {
- @JsonIgnore
- private SerDeInfo serDeInfo;
- @JsonProperty
- public String name;
- @JsonProperty
- public String serializationLib;
- @JsonProperty
- public Map<String,String> parameters;
-
- @JsonCreator
- public SerDeInfoWrapper(@JsonProperty("name") String name, @JsonProperty("serializationLib") String serializationLib, @JsonProperty("parameters") Map<String, String> parameters) {
- this.name = name;
- this.serializationLib = serializationLib;
- this.parameters = parameters;
- this.serDeInfo = new SerDeInfo(name, serializationLib, parameters);
- }
-
- public SerDeInfoWrapper(SerDeInfo serDeInfo) {
- this.serDeInfo = serDeInfo;
- this.name = serDeInfo.getName();
- this.serializationLib = serDeInfo.getSerializationLib();
- this.parameters = serDeInfo.getParameters();
- }
-
- @JsonIgnore
- public SerDeInfo getSerDeInfo() {
- return serDeInfo;
- }
- }
-
- public static class FieldSchemaWrapper {
- @JsonIgnore
- private FieldSchema fieldSchema;
- @JsonProperty
- public String name;
- @JsonProperty
- public String type;
- @JsonProperty
- public String comment;
-
- @JsonCreator
- public FieldSchemaWrapper(@JsonProperty("name") String name, @JsonProperty("type") String type, @JsonProperty("comment") String comment) {
- this.name = name;
- this.type = type;
- this.comment = comment;
- this.fieldSchema = new FieldSchema(name, type, comment);
- }
-
- public FieldSchemaWrapper(FieldSchema fieldSchema) {
- this.fieldSchema = fieldSchema;
- this.name = fieldSchema.getName();
- this.type = fieldSchema.getType();
- this.comment = fieldSchema.getComment();
- }
-
- @JsonIgnore
- public FieldSchema getFieldSchema() {
- return fieldSchema;
- }
- }
-
- public static class OrderWrapper {
- @JsonIgnore
- private Order ord;
- @JsonProperty
- public String col;
- @JsonProperty
- public int order;
-
- @JsonCreator
- public OrderWrapper(@JsonProperty("col") String col, @JsonProperty("order") int order) {
- this.col = col;
- this.order = order;
- }
-
- public OrderWrapper(Order ord) {
- this.ord = ord;
- this.col = ord.getCol();
- this.order = ord.getOrder();
- }
-
- @JsonIgnore
- public Order getOrder() {
- return ord;
- }
- }
-
- public Map<String, String> getPartitionNameTypeMap() {
- return partitionNameTypeMap;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java
new file mode 100644
index 0000000..91888ef
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java
@@ -0,0 +1,76 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec.store.hive;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is wrapper of {@link Table} class and used for
+ * storage of such additional information as column lists cache.
+ */
+public class HiveTableWithColumnCache extends Table {
+
+ private ColumnListsCache columnListsCache;
+
+ public HiveTableWithColumnCache() {
+ super();
+ }
+
+ public HiveTableWithColumnCache(
+ String tableName,
+ String dbName,
+ String owner,
+ int createTime,
+ int lastAccessTime,
+ int retention,
+ StorageDescriptor sd,
+ List<FieldSchema> partitionKeys,
+ Map<String,String> parameters,
+ String viewOriginalText,
+ String viewExpandedText,
+ String tableType,
+ ColumnListsCache columnListsCache) {
+ super(tableName, dbName, owner, createTime, lastAccessTime, retention, sd,
+ partitionKeys, parameters, viewOriginalText, viewExpandedText, tableType);
+ this.columnListsCache = columnListsCache;
+ }
+
+ public HiveTableWithColumnCache(HiveTableWithColumnCache other) {
+ super(other);
+ columnListsCache = other.getColumnListsCache();
+ }
+
+ public HiveTableWithColumnCache(Table other, ColumnListsCache columnListsCache) {
+ super(other);
+ this.columnListsCache = columnListsCache;
+ }
+
+ /**
+ * To reduce physical plan for Hive tables, unique partition lists of columns stored in the
+ * table's column lists cache.
+ *
+ * @return table's column lists cache
+ */
+ public ColumnListsCache getColumnListsCache() {
+ return columnListsCache;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWrapper.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWrapper.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWrapper.java
new file mode 100644
index 0000000..7f2afa6
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWrapper.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.Lists;
+
+@JsonTypeName("table")
+public class HiveTableWrapper {
+
+ @JsonIgnore
+ private HiveTableWithColumnCache table;
+
+ @JsonProperty
+ public String tableName;
+ @JsonProperty
+ public String dbName;
+ @JsonProperty
+ public String owner;
+ @JsonProperty
+ public int createTime;
+ @JsonProperty
+ public int lastAccessTime;
+ @JsonProperty
+ public int retention;
+ @JsonProperty
+ public StorageDescriptorWrapper sd;
+ @JsonProperty
+ public List<FieldSchemaWrapper> partitionKeys;
+ @JsonProperty
+ public Map<String,String> parameters;
+ @JsonProperty
+ public String viewOriginalText;
+ @JsonProperty
+ public String viewExpandedText;
+ @JsonProperty
+ public String tableType;
+ @JsonProperty
+ public ColumnsCacheWrapper columnsCache;
+
+ @JsonIgnore
+ public final Map<String, String> partitionNameTypeMap = new HashMap<>();
+
+ @JsonCreator
+ public HiveTableWrapper(@JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("owner") String owner,
+ @JsonProperty("createTime") int createTime, @JsonProperty("lastAccessTime") int lastAccessTime,
+ @JsonProperty("retention") int retention, @JsonProperty("sd") StorageDescriptorWrapper sd,
+ @JsonProperty("partitionKeys") List<FieldSchemaWrapper> partitionKeys, @JsonProperty("parameters") Map<String, String> parameters,
+ @JsonProperty("viewOriginalText") String viewOriginalText, @JsonProperty("viewExpandedText") String viewExpandedText,
+ @JsonProperty("tableType") String tableType, @JsonProperty("columnsCache") ColumnsCacheWrapper columnsCache
+ ) {
+ this.tableName = tableName;
+ this.dbName = dbName;
+ this.owner = owner;
+ this.createTime = createTime;
+ this.lastAccessTime = lastAccessTime;
+ this.retention = retention;
+ this.sd = sd;
+ this.partitionKeys = partitionKeys;
+ this.parameters = parameters;
+ this.viewOriginalText = viewOriginalText;
+ this.viewExpandedText = viewExpandedText;
+ this.tableType = tableType;
+ this.columnsCache = columnsCache;
+
+ List<FieldSchema> partitionKeysUnwrapped = Lists.newArrayList();
+ for (FieldSchemaWrapper w : partitionKeys) {
+ partitionKeysUnwrapped.add(w.getFieldSchema());
+ partitionNameTypeMap.put(w.name, w.type);
+ }
+ StorageDescriptor sdUnwrapped = sd.getSd();
+ this.table = new HiveTableWithColumnCache(tableName, dbName, owner, createTime, lastAccessTime, retention, sdUnwrapped, partitionKeysUnwrapped,
+ parameters, viewOriginalText, viewExpandedText, tableType, columnsCache.getColumnListsCache());
+ }
+
+ public HiveTableWrapper(HiveTableWithColumnCache table) {
+ if (table == null) {
+ return;
+ }
+ this.table = table;
+ this.tableName = table.getTableName();
+ this.dbName = table.getDbName();
+ this.owner = table.getOwner();
+ this.createTime = table.getCreateTime();
+ this.lastAccessTime = table.getLastAccessTime();
+ this.retention = table.getRetention();
+ this.sd = new StorageDescriptorWrapper(table.getSd());
+ this.partitionKeys = Lists.newArrayList();
+ for (FieldSchema f : table.getPartitionKeys()) {
+ this.partitionKeys.add(new FieldSchemaWrapper(f));
+ partitionNameTypeMap.put(f.getName(), f.getType());
+ }
+ this.parameters = table.getParameters();
+ this.viewOriginalText = table.getViewOriginalText();
+ this.viewExpandedText = table.getViewExpandedText();
+ this.tableType = table.getTableType();
+ this.columnsCache = new ColumnsCacheWrapper(table.getColumnListsCache());
+ }
+
+ @JsonIgnore
+ public HiveTableWithColumnCache getTable() {
+ return table;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Table(");
+
+ sb.append("dbName:");
+ sb.append(this.dbName);
+ sb.append(", ");
+
+ sb.append("tableName:");
+ sb.append(this.tableName);
+ sb.append(")");
+
+ return sb.toString();
+ }
+
+ /**
+ * Wrapper for {@link Partition} class. Used for serialization and deserialization of {@link HivePartition}.
+ */
+ public static class HivePartitionWrapper {
+
+ @JsonIgnore
+ private HivePartition partition;
+
+ @JsonProperty
+ public List<String> values;
+
+ @JsonProperty
+ public String tableName;
+
+ @JsonProperty
+ public String dbName;
+
+ @JsonProperty
+ public int createTime;
+
+ @JsonProperty
+ public int lastAccessTime;
+
+ @JsonProperty
+ public StorageDescriptorWrapper sd;
+
+ @JsonProperty
+ public Map<String, String> parameters;
+
+ @JsonProperty
+ private int columnListIndex;
+
+ @JsonCreator
+ public HivePartitionWrapper(@JsonProperty("values") List<String> values, @JsonProperty("tableName") String tableName,
+ @JsonProperty("dbName") String dbName, @JsonProperty("createTime") int createTime,
+ @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("sd") StorageDescriptorWrapper sd,
+ @JsonProperty("parameters") Map<String, String> parameters, @JsonProperty("columnListIndex") int columnListIndex) {
+ this.values = values;
+ this.tableName = tableName;
+ this.dbName = dbName;
+ this.createTime = createTime;
+ this.lastAccessTime = lastAccessTime;
+ this.sd = sd;
+ this.parameters = parameters;
+ this.columnListIndex = columnListIndex;
+
+ StorageDescriptor sdUnwrapped = sd.getSd();
+ this.partition = new HivePartition(values, tableName, dbName, createTime, lastAccessTime, sdUnwrapped, parameters, columnListIndex);
+ }
+
+ public HivePartitionWrapper(HivePartition partition) {
+ if (partition == null) {
+ return;
+ }
+ this.partition = partition;
+ this.values = partition.getValues();
+ this.tableName = partition.getTableName();
+ this.dbName = partition.getDbName();
+ this.createTime = partition.getCreateTime();
+ this.lastAccessTime = partition.getLastAccessTime();
+ this.sd = new StorageDescriptorWrapper(partition.getSd());
+ this.parameters = partition.getParameters();
+ this.columnListIndex = partition.getColumnListIndex();
+ }
+
+ @JsonIgnore
+ public HivePartition getPartition() {
+ return partition;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Partition(");
+ sb.append("values:");
+ sb.append(this.values);
+ sb.append(")");
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Wrapper for {@link StorageDescriptor} class.
+ * Used in {@link HivePartitionWrapper} and {@link HiveTableWrapper}
+ * for serialization and deserialization of {@link StorageDescriptor}.
+ */
+ public static class StorageDescriptorWrapper {
+
+ @JsonIgnore
+ private StorageDescriptor sd;
+
+ // column lists stored in ColumnListsCache
+ @JsonIgnore
+ public List<FieldSchemaWrapper> columns;
+
+ @JsonProperty
+ public String location;
+
+ @JsonProperty
+ public String inputFormat;
+
+ @JsonProperty
+ public String outputFormat;
+
+ @JsonProperty
+ public boolean compressed;
+
+ @JsonProperty
+ public int numBuckets;
+
+ @JsonProperty
+ public SerDeInfoWrapper serDeInfo;
+
+ @JsonProperty
+ public List<OrderWrapper> sortCols;
+
+ @JsonProperty
+ public Map<String, String> parameters;
+
+ @JsonCreator
+ public StorageDescriptorWrapper(@JsonProperty("columns") List<FieldSchemaWrapper> columns, @JsonProperty("location") String location, @JsonProperty("inputFormat") String inputFormat,
+ @JsonProperty("outputFormat") String outputFormat, @JsonProperty("compressed") boolean compressed, @JsonProperty("numBuckets") int numBuckets,
+ @JsonProperty("serDeInfo") SerDeInfoWrapper serDeInfo, @JsonProperty("sortCols") List<OrderWrapper> sortCols,
+ @JsonProperty("parameters") Map<String,String> parameters) {
+ this.columns = columns;
+ this.location = location;
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
+ this.compressed = compressed;
+ this.numBuckets = numBuckets;
+ this.serDeInfo = serDeInfo;
+ this.sortCols = sortCols;
+ this.parameters = parameters;
+ List<FieldSchema> colsUnwrapped;
+ if (columns != null) {
+ colsUnwrapped = Lists.newArrayList();
+ for (FieldSchemaWrapper fieldSchema : columns) {
+ colsUnwrapped.add(fieldSchema.getFieldSchema());
+ }
+ } else {
+ colsUnwrapped = null;
+ }
+ SerDeInfo serDeInfoUnwrapped = serDeInfo.getSerDeInfo();
+ List<Order> sortColsUnwrapped;
+ if (sortCols != null) {
+ sortColsUnwrapped = Lists.newArrayList();
+ for (OrderWrapper order : sortCols) {
+ sortColsUnwrapped.add(order.getOrder());
+ }
+ } else {
+ sortColsUnwrapped = null;
+ }
+ sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat,
+ compressed, numBuckets, serDeInfoUnwrapped, null, sortColsUnwrapped, parameters);
+ }
+
+ public StorageDescriptorWrapper(StorageDescriptor storageDescriptor) {
+ sd = storageDescriptor;
+ location = storageDescriptor.getLocation();
+ inputFormat = storageDescriptor.getInputFormat();
+ outputFormat = storageDescriptor.getOutputFormat();
+ compressed = storageDescriptor.isCompressed();
+ numBuckets = storageDescriptor.getNumBuckets();
+ serDeInfo = new SerDeInfoWrapper(storageDescriptor.getSerdeInfo());
+ if (sd.getSortCols() != null) {
+ sortCols = Lists.newArrayList();
+ for (Order order : sd.getSortCols()) {
+ sortCols.add(new OrderWrapper(order));
+ }
+ }
+ parameters = storageDescriptor.getParameters();
+ if (sd.getCols() != null) {
+ this.columns = Lists.newArrayList();
+ for (FieldSchema fieldSchema : sd.getCols()) {
+ this.columns.add(new FieldSchemaWrapper(fieldSchema));
+ }
+ }
+ }
+
+ @JsonIgnore
+ public StorageDescriptor getSd() {
+ return sd;
+ }
+ }
+
+ public static class SerDeInfoWrapper {
+ @JsonIgnore
+ private SerDeInfo serDeInfo;
+ @JsonProperty
+ public String name;
+ @JsonProperty
+ public String serializationLib;
+ @JsonProperty
+ public Map<String,String> parameters;
+
+ @JsonCreator
+ public SerDeInfoWrapper(@JsonProperty("name") String name, @JsonProperty("serializationLib") String serializationLib, @JsonProperty("parameters") Map<String, String> parameters) {
+ this.name = name;
+ this.serializationLib = serializationLib;
+ this.parameters = parameters;
+ this.serDeInfo = new SerDeInfo(name, serializationLib, parameters);
+ }
+
+ public SerDeInfoWrapper(SerDeInfo serDeInfo) {
+ this.serDeInfo = serDeInfo;
+ this.name = serDeInfo.getName();
+ this.serializationLib = serDeInfo.getSerializationLib();
+ this.parameters = serDeInfo.getParameters();
+ }
+
+ @JsonIgnore
+ public SerDeInfo getSerDeInfo() {
+ return serDeInfo;
+ }
+ }
+
+ public static class FieldSchemaWrapper {
+ @JsonIgnore
+ private FieldSchema fieldSchema;
+ @JsonProperty
+ public String name;
+ @JsonProperty
+ public String type;
+ @JsonProperty
+ public String comment;
+
+ @JsonCreator
+ public FieldSchemaWrapper(@JsonProperty("name") String name, @JsonProperty("type") String type, @JsonProperty("comment") String comment) {
+ this.name = name;
+ this.type = type;
+ this.comment = comment;
+ this.fieldSchema = new FieldSchema(name, type, comment);
+ }
+
+ public FieldSchemaWrapper(FieldSchema fieldSchema) {
+ this.fieldSchema = fieldSchema;
+ this.name = fieldSchema.getName();
+ this.type = fieldSchema.getType();
+ this.comment = fieldSchema.getComment();
+ }
+
+ @JsonIgnore
+ public FieldSchema getFieldSchema() {
+ return fieldSchema;
+ }
+ }
+
+ public static class OrderWrapper {
+ @JsonIgnore
+ private Order ord;
+ @JsonProperty
+ public String col;
+ @JsonProperty
+ public int order;
+
+ @JsonCreator
+ public OrderWrapper(@JsonProperty("col") String col, @JsonProperty("order") int order) {
+ this.col = col;
+ this.order = order;
+ }
+
+ public OrderWrapper(Order ord) {
+ this.ord = ord;
+ this.col = ord.getCol();
+ this.order = ord.getOrder();
+ }
+
+ @JsonIgnore
+ public Order getOrder() {
+ return ord;
+ }
+ }
+
+ public Map<String, String> getPartitionNameTypeMap() {
+ return partitionNameTypeMap;
+ }
+
+ /**
+ * Wrapper for {@link ColumnListsCache} class.
+ * Used in {@link HiveTableWrapper} for serialization and deserialization of {@link ColumnListsCache}.
+ */
+ public static class ColumnsCacheWrapper {
+ @JsonIgnore
+ private final ColumnListsCache columnListsCache;
+
+ @JsonProperty
+ private final List<List<FieldSchemaWrapper>> keys;
+
+ @JsonCreator
+ public ColumnsCacheWrapper(@JsonProperty("keys") List<List<FieldSchemaWrapper>> keys) {
+ this.keys = keys;
+ this.columnListsCache = new ColumnListsCache();
+ for (List<FieldSchemaWrapper> columns : keys) {
+ final List<FieldSchema> columnsUnwrapped = Lists.newArrayList();
+ for (FieldSchemaWrapper field : columns) {
+ columnsUnwrapped.add(field.getFieldSchema());
+ }
+ columnListsCache.addOrGet(columnsUnwrapped);
+ }
+ }
+
+ public ColumnsCacheWrapper(ColumnListsCache columnListsCache) {
+ this.columnListsCache = columnListsCache;
+ final List<List<FieldSchemaWrapper>> keysWrapped = Lists.newArrayList();
+ for (List<FieldSchema> columns : columnListsCache.getFields()) {
+ final List<FieldSchemaWrapper> columnsWrapped = Lists.newArrayList();
+ for (FieldSchema field : columns) {
+ columnsWrapped.add(new FieldSchemaWrapper(field));
+ }
+ keysWrapped.add(columnsWrapped);
+ }
+ this.keys = keysWrapped;
+ }
+
+ @JsonIgnore
+ public ColumnListsCache getColumnListsCache() {
+ return columnListsCache;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index 2e23aff..1d5e6bf 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -71,6 +71,7 @@ import org.joda.time.DateTimeZone;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -398,12 +399,14 @@ public class HiveUtilities {
* Wrapper around {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)} which also adds parameters from table
* to properties returned by {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)}.
*
- * @param partition
- * @param table
- * @return
+ * @param partition the source of partition level parameters
+ * @param table the source of table level parameters
+ * @return properties
*/
- public static Properties getPartitionMetadata(final Partition partition, final Table table) {
- final Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+ public static Properties getPartitionMetadata(final HivePartition partition, final HiveTableWithColumnCache table) {
+ final Properties properties;
+ restoreColumns(table, partition);
+ properties = MetaStoreUtils.getPartitionMetadata(partition, table);
// SerDe expects properties from Table, but above call doesn't add Table properties.
// Include Table properties in final list in order to not to break SerDes that depend on
@@ -417,6 +420,34 @@ public class HiveUtilities {
return properties;
}
+ /**
+ * Sets columns from table cache to table and partition.
+ *
+ * @param partition partition which will set column list
+ * @param table the source of column lists cache
+ */
+ public static void restoreColumns(HiveTableWithColumnCache table, HivePartition partition) {
+ // exactly the same column lists for partitions or table
+ // stored only one time to reduce physical plan serialization
+ if (partition != null && partition.getSd().getCols() == null) {
+ partition.getSd().setCols(table.getColumnListsCache().getColumns(partition.getColumnListIndex()));
+ }
+ if (table.getSd().getCols() == null) {
+ table.getSd().setCols(table.getColumnListsCache().getColumns(0));
+ }
+ }
+
+ /**
+ * Wrapper around {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}
+ * which also sets columns from table cache to table and returns properties returned by
+ * {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}.
+ */
+ public static Properties getTableMetadata(HiveTableWithColumnCache table) {
+ restoreColumns(table, null);
+ return MetaStoreUtils.getSchema(table.getSd(), table.getSd(), table.getParameters(),
+ table.getDbName(), table.getTableName(), table.getPartitionKeys());
+ }
+
public static void throwUnsupportedHiveDataTypeError(String unsupportedType) {
StringBuilder errMsg = new StringBuilder();
errMsg.append(String.format("Unsupported Hive data type %s. ", unsupportedType));
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
index 29f7757..af02c0a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.store.hive.HiveReadEntry;
import org.apache.drill.exec.store.hive.HiveStoragePlugin;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
@@ -43,11 +44,11 @@ import com.google.common.collect.Lists;
public class DrillHiveTable extends DrillTable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillHiveTable.class);
- protected final Table hiveTable;
+ protected final HiveTableWithColumnCache hiveTable;
public DrillHiveTable(String storageEngineName, HiveStoragePlugin plugin, String userName, HiveReadEntry readEntry) {
super(storageEngineName, plugin, userName, readEntry);
- this.hiveTable = new Table(readEntry.getTable());
+ this.hiveTable = new HiveTableWithColumnCache(readEntry.getTable());
}
@Override
@@ -55,7 +56,7 @@ public class DrillHiveTable extends DrillTable{
List<RelDataType> typeList = Lists.newArrayList();
List<String> fieldNameList = Lists.newArrayList();
- List<FieldSchema> hiveFields = hiveTable.getCols();
+ List<FieldSchema> hiveFields = hiveTable.getColumnListsCache().getColumns(0);
for(FieldSchema hiveField : hiveFields) {
fieldNameList.add(hiveField.getName());
typeList.add(getNullableRelDataTypeFromHiveType(
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
index 7ac1896..a32f538 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
@@ -17,16 +17,19 @@
*/
package org.apache.drill.exec;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.drill.exec.hive.HiveTestBase;
import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
+import java.util.List;
+
public class TestHivePartitionPruning extends HiveTestBase {
// enable decimal data type
@BeforeClass
@@ -149,6 +152,30 @@ public class TestHivePartitionPruning extends HiveTestBase {
.go();
}
+ @Test // DRILL-5032
+ public void testPartitionColumnsCaching() throws Exception {
+ final String query = "EXPLAIN PLAN FOR SELECT * FROM hive.partition_with_few_schemas";
+
+ List<QueryDataBatch> queryDataBatches = testSqlWithResults(query);
+ String resultString = getResultString(queryDataBatches, "|");
+
+ // different for both partitions column strings from physical plan
+ String columnString = "\"name\" : \"a\"";
+ String secondColumnString = "\"name\" : \"a1\"";
+
+ int columnIndex = resultString.indexOf(columnString);
+ assertTrue(columnIndex >= 0);
+ columnIndex = resultString.indexOf(columnString, columnIndex + 1);
+ // checks that column added to physical plan only one time
+ assertEquals(-1, columnIndex);
+
+ int secondColumnIndex = resultString.indexOf(secondColumnString);
+ assertTrue(secondColumnIndex >= 0);
+ secondColumnIndex = resultString.indexOf(secondColumnString, secondColumnIndex + 1);
+ // checks that column added to physical plan only one time
+ assertEquals(-1, secondColumnIndex);
+ }
+
@AfterClass
public static void disableDecimalDataType() throws Exception {
test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index 0a94867..fb4bb17 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -43,6 +43,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
.baselineValues("hive.default", "kv_sh")
.baselineValues("hive.default", "countstar_parquet")
.baselineValues("hive.default", "simple_json")
+ .baselineValues("hive.default", "partition_with_few_schemas")
.go();
testBuilder()
@@ -243,6 +244,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
.baselineValues("DRILL", "hive.default", "readtest_parquet", "TABLE")
.baselineValues("DRILL", "hive.default", "hiveview", "VIEW")
.baselineValues("DRILL", "hive.default", "partition_pruning_test", "TABLE")
+ .baselineValues("DRILL", "hive.default", "partition_with_few_schemas", "TABLE")
.baselineValues("DRILL", "hive.default", "kv_parquet", "TABLE")
.baselineValues("DRILL", "hive.default", "countstar_parquet", "TABLE")
.baselineValues("DRILL", "hive.default", "kv_sh", "TABLE")
http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 7a5b72d..435c66b 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -438,6 +438,15 @@ public class HiveTestDataGenerator {
executeQuery(hiveDriver, "INSERT OVERWRITE TABLE partition_pruning_test PARTITION(c, d, e) " +
"SELECT a, b, c, d, e FROM partition_pruning_test_loadtable");
+ executeQuery(hiveDriver,
+ "CREATE TABLE IF NOT EXISTS partition_with_few_schemas(a DATE, b TIMESTAMP) "+
+ "partitioned by (c INT, d INT, e INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE");
+ executeQuery(hiveDriver, "INSERT OVERWRITE TABLE partition_with_few_schemas PARTITION(c, d, e) " +
+ "SELECT a, b, c, d, e FROM partition_pruning_test_loadtable");
+ executeQuery(hiveDriver,"alter table partition_with_few_schemas partition(c=1, d=1, e=1) change a a1 INT");
+ executeQuery(hiveDriver,"alter table partition_with_few_schemas partition(c=1, d=1, e=2) change a a1 INT");
+ executeQuery(hiveDriver,"alter table partition_with_few_schemas partition(c=2, d=2, e=2) change a a1 INT");
+
// Add a partition with custom location
executeQuery(hiveDriver,
String.format("ALTER TABLE partition_pruning_test ADD PARTITION (c=99, d=98, e=97) LOCATION '%s'",