You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2016/12/20 04:27:35 UTC

[6/8] drill git commit: DRILL-5032: Drill query on hive parquet table failed with OutOfMemoryError: Java heap space

DRILL-5032: Drill query on hive parquet table failed with OutOfMemoryError: Java heap space

close apache/drill#654


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/03928af0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/03928af0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/03928af0

Branch: refs/heads/master
Commit: 03928af0b5cafd52e5b153aa852e5642b505f2c6
Parents: d8cc710
Author: Serhii-Harnyk <se...@gmail.com>
Authored: Thu Oct 27 19:20:27 2016 +0000
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Dec 19 15:57:36 2016 -0800

----------------------------------------------------------------------
 .../codegen/templates/HiveRecordReaders.java    |   4 +-
 .../planner/sql/HivePartitionDescriptor.java    |  12 +-
 ...onvertHiveParquetScanToDrillParquetScan.java |  14 +-
 .../drill/exec/store/hive/ColumnListsCache.java |  95 ++++
 .../store/hive/DrillHiveMetaStoreClient.java    |  40 +-
 .../exec/store/hive/HiveAbstractReader.java     |  15 +-
 .../store/hive/HiveDrillNativeParquetScan.java  |   4 +-
 .../hive/HiveDrillNativeScanBatchCreator.java   |   6 +-
 .../exec/store/hive/HiveMetadataProvider.java   |  11 +-
 .../drill/exec/store/hive/HivePartition.java    |  61 +++
 .../drill/exec/store/hive/HiveReadEntry.java    |  31 +-
 .../apache/drill/exec/store/hive/HiveScan.java  |  17 +-
 .../exec/store/hive/HiveScanBatchCreator.java   |   8 +-
 .../drill/exec/store/hive/HiveSubScan.java      |  15 +-
 .../apache/drill/exec/store/hive/HiveTable.java | 382 ---------------
 .../store/hive/HiveTableWithColumnCache.java    |  76 +++
 .../drill/exec/store/hive/HiveTableWrapper.java | 466 +++++++++++++++++++
 .../drill/exec/store/hive/HiveUtilities.java    |  41 +-
 .../exec/store/hive/schema/DrillHiveTable.java  |   7 +-
 .../drill/exec/TestHivePartitionPruning.java    |  29 +-
 .../exec/hive/TestInfoSchemaOnHiveStorage.java  |   2 +
 .../exec/store/hive/HiveTestDataGenerator.java  |   9 +
 .../store/hive/schema/TestColumnListCache.java  | 111 +++++
 23 files changed, 980 insertions(+), 476 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
index 0dc8c08..7d6733e 100644
--- a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
+++ b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
@@ -43,8 +43,6 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -75,7 +73,7 @@ public class Hive${entry.hiveReader}Reader extends HiveAbstractReader {
   Object value;
 </#if>
 
-  public Hive${entry.hiveReader}Reader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
+  public Hive${entry.hiveReader}Reader(HiveTableWithColumnCache table, HivePartition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
                        FragmentContext context, final HiveConf hiveConf,
                        UserGroupInformation proxyUgi) throws ExecutionSetupException {
     super(table, partition, inputSplit, projectedColumns, context, hiveConf, proxyUgi);

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index d42aea7..b22c14d 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -31,10 +31,10 @@ import org.apache.drill.exec.planner.PartitionLocation;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.store.hive.HiveTableWrapper;
 import org.apache.drill.exec.store.hive.HiveUtilities;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveScan;
-import org.apache.drill.exec.store.hive.HiveTable;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.hive.metastore.api.Partition;
 
@@ -64,7 +64,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
     this.scanRel = scanRel;
     this.managedBuffer = managedBuffer.reallocIfNeeded(256);
     this.defaultPartitionValue = defaultPartitionValue;
-    for (HiveTable.FieldSchemaWrapper wrapper : ((HiveScan) scanRel.getGroupScan()).hiveReadEntry.table.partitionKeys) {
+    for (HiveTableWrapper.FieldSchemaWrapper wrapper : ((HiveScan) scanRel.getGroupScan()).hiveReadEntry.table.partitionKeys) {
       partitionMap.put(wrapper.name, i);
       i++;
     }
@@ -115,7 +115,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
     }
 
     for(ValueVector v : vectors) {
-      if(v == null){
+      if (v == null) {
         continue;
       }
       v.getMutator().setValueCount(partitions.size());
@@ -166,10 +166,10 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
   private GroupScan createNewGroupScan(List<PartitionLocation> newPartitionLocations) throws ExecutionSetupException {
     HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
     HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
-    List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions;
-    List<HiveTable.HivePartition> newPartitions = new LinkedList<>();
+    List<HiveTableWrapper.HivePartitionWrapper> oldPartitions = origReadEntry.partitions;
+    List<HiveTableWrapper.HivePartitionWrapper> newPartitions = Lists.newLinkedList();
 
-    for (HiveTable.HivePartition part: oldPartitions) {
+    for (HiveTableWrapper.HivePartitionWrapper part: oldPartitions) {
       String partitionLocation = part.getPartition().getSd().getLocation();
       for (PartitionLocation newPartitionLocation: newPartitionLocations) {
         if (partitionLocation.equals(newPartitionLocation.getEntirePartitionLocation())) {

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index 228308f..df85ca0 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -39,10 +39,10 @@ import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveScan;
-import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
+import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
 import org.apache.drill.exec.store.hive.HiveUtilities;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -97,23 +97,23 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
 
     final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
     final HiveConf hiveConf = hiveScan.getHiveConf();
-    final Table hiveTable = hiveScan.hiveReadEntry.getTable();
+    final HiveTableWithColumnCache hiveTable = hiveScan.hiveReadEntry.getTable();
 
     final Class<? extends InputFormat<?,?>> tableInputFormat =
-        getInputFormatFromSD(MetaStoreUtils.getTableMetadata(hiveTable), hiveScan.hiveReadEntry, hiveTable.getSd(),
+        getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), hiveScan.hiveReadEntry, hiveTable.getSd(),
             hiveConf);
     if (tableInputFormat == null || !tableInputFormat.equals(MapredParquetInputFormat.class)) {
       return false;
     }
 
-    final List<HivePartition> partitions = hiveScan.hiveReadEntry.getHivePartitionWrappers();
+    final List<HivePartitionWrapper> partitions = hiveScan.hiveReadEntry.getHivePartitionWrappers();
     if (partitions == null) {
       return true;
     }
 
     final List<FieldSchema> tableSchema = hiveTable.getSd().getCols();
     // Make sure all partitions have the same input format as the table input format
-    for (HivePartition partition : partitions) {
+    for (HivePartitionWrapper partition : partitions) {
       final StorageDescriptor partitionSD = partition.getPartition().getSd();
       Class<? extends InputFormat<?, ?>> inputFormat = getInputFormatFromSD(
           HiveUtilities.getPartitionMetadata(partition.getPartition(), hiveTable), hiveScan.hiveReadEntry, partitionSD,
@@ -179,7 +179,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
           getPartitionColMapping(hiveTable, partitionColumnLabel);
 
       final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel);
-      if(hiveScanRel.getRowType().getFieldCount() == 0) {
+      if (hiveScanRel.getRowType().getFieldCount() == 0) {
         call.transformTo(nativeScanRel);
       } else {
         final DrillProjectRel projectRel = createProjectRel(hiveScanRel, partitionColMapping, nativeScanRel);

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/ColumnListsCache.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/ColumnListsCache.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/ColumnListsCache.java
new file mode 100644
index 0000000..ae4baa1
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/ColumnListsCache.java
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The class represents "cache" for partition and table columns.
+ * Used to reduce physical plan for Hive tables.
+ * Only unique partition lists of columns stored in the column lists cache.
+ * Table columns should be stored at index 0.
+ */
+public class ColumnListsCache {
+  // contains immutable column lists
+  private final List<List<FieldSchema>> fields;
+
+  // keys of the map are column lists and values are them positions in list fields
+  private final Map<List<FieldSchema>, Integer> keys;
+
+  public ColumnListsCache(Table table) {
+    this();
+    // table columns stored at index 0.
+    addOrGet(table.getSd().getCols());
+  }
+
+  public ColumnListsCache() {
+    this.fields = Lists.newArrayList();
+    this.keys = Maps.newHashMap();
+  }
+
+  /**
+   * Checks if column list has been added before and returns position of column list.
+   * If list is unique, adds list to the fields list and returns it position.
+   * Returns -1, if {@param columns} equals null.
+   *
+   * @param columns list of columns
+   * @return index of {@param columns} or -1, if {@param columns} equals null
+   */
+  public int addOrGet(List<FieldSchema> columns) {
+    if (columns == null) {
+      return -1;
+    }
+    Integer index = keys.get(columns);
+    if (index != null) {
+      return index;
+    } else {
+      index = fields.size();
+      final List<FieldSchema> immutableList = ImmutableList.copyOf(columns);
+      fields.add(immutableList);
+      keys.put(immutableList, index);
+      return index;
+    }
+  }
+
+  /**
+   * Returns list of columns at the specified position in fields list,
+   * or null if index is negative or greater than fields list size.
+   *
+   * @param index index of column list to return
+   * @return list of columns at the specified position in fields list
+   * or null if index is negative or greater than fields list size
+   */
+  public List<FieldSchema> getColumns(int index) {
+    if (index >= 0 && index < fields.size()) {
+      return fields.get(index);
+    } else {
+      return null;
+    }
+  }
+
+  public List<List<FieldSchema>> getFields() {
+    return Lists.newArrayList(fields);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java
index d7ba659..92efdc7 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java
@@ -23,8 +23,6 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import org.apache.calcite.schema.Schema;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.util.ImpersonationUtil;
@@ -270,9 +268,9 @@ public abstract class DrillHiveMetaStoreClient extends HiveMetaStoreClient {
   /** Helper method which gets table metadata. Retries once if the first call to fetch the metadata fails */
   protected static HiveReadEntry getHiveReadEntryHelper(final IMetaStoreClient mClient, final String dbName,
       final String tableName) throws TException {
-    Table t = null;
+    Table table = null;
     try {
-      t = mClient.getTable(dbName, tableName);
+      table = mClient.getTable(dbName, tableName);
     } catch (MetaException | NoSuchObjectException e) {
       throw e;
     } catch (TException e) {
@@ -283,10 +281,10 @@ public abstract class DrillHiveMetaStoreClient extends HiveMetaStoreClient {
         logger.warn("Failure while attempting to close existing hive metastore connection. May leak connection.", ex);
       }
       mClient.reconnect();
-      t = mClient.getTable(dbName, tableName);
+      table = mClient.getTable(dbName, tableName);
     }
 
-    if (t == null) {
+    if (table == null) {
       throw new UnknownTableException(String.format("Unable to find table '%s'.", tableName));
     }
 
@@ -306,16 +304,34 @@ public abstract class DrillHiveMetaStoreClient extends HiveMetaStoreClient {
       partitions = mClient.listPartitions(dbName, tableName, (short) -1);
     }
 
-    List<HiveTable.HivePartition> hivePartitions = Lists.newArrayList();
-    for (Partition part : partitions) {
-      hivePartitions.add(new HiveTable.HivePartition(part));
+    List<HiveTableWrapper.HivePartitionWrapper> hivePartitionWrappers = Lists.newArrayList();
+    HiveTableWithColumnCache hiveTable = new HiveTableWithColumnCache(table, new ColumnListsCache(table));
+    for (Partition partition : partitions) {
+      hivePartitionWrappers.add(createPartitionWithSpecColumns(hiveTable, partition));
     }
 
-    if (hivePartitions.size() == 0) {
-      hivePartitions = null;
+    if (hivePartitionWrappers.isEmpty()) {
+      hivePartitionWrappers = null;
     }
 
-    return new HiveReadEntry(new HiveTable(t), hivePartitions);
+    return new HiveReadEntry(new HiveTableWrapper(hiveTable), hivePartitionWrappers);
+  }
+
+  /**
+   * Helper method which stores partition columns in table columnListCache. If table columnListCache has exactly the
+   * same columns as partition, in partition stores columns index that corresponds to identical column list.
+   * If table columnListCache hasn't such column list, the column list adds to table columnListCache and in partition
+   * stores columns index that corresponds to column list.
+   *
+   * @param table     hive table instance
+   * @param partition partition instance
+   * @return hive partition wrapper
+   */
+  public static HiveTableWrapper.HivePartitionWrapper createPartitionWithSpecColumns(HiveTableWithColumnCache table, Partition partition) {
+    int listIndex = table.getColumnListsCache().addOrGet(partition.getSd().getCols());
+    HivePartition hivePartition = new HivePartition(partition, listIndex);
+    HiveTableWrapper.HivePartitionWrapper hivePartitionWrapper = new HiveTableWrapper.HivePartitionWrapper(hivePartition);
+    return hivePartitionWrapper;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
index 107fc66..8c6df84 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
@@ -39,10 +39,7 @@ import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -67,8 +64,8 @@ public abstract class HiveAbstractReader extends AbstractRecordReader {
 
   protected final DrillBuf managedBuffer;
 
-  protected Table table;
-  protected Partition partition;
+  protected HiveTableWithColumnCache table;
+  protected HivePartition partition;
   protected InputSplit inputSplit;
   protected List<String> selectedColumnNames;
   protected List<StructField> selectedStructFieldRefs = Lists.newArrayList();
@@ -106,9 +103,9 @@ public abstract class HiveAbstractReader extends AbstractRecordReader {
 
   protected static final int TARGET_RECORD_COUNT = 4000;
 
-  public HiveAbstractReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
-                       FragmentContext context, final HiveConf hiveConf,
-                       UserGroupInformation proxyUgi) throws ExecutionSetupException {
+  public HiveAbstractReader(HiveTableWithColumnCache table, HivePartition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
+                            FragmentContext context, final HiveConf hiveConf,
+                            UserGroupInformation proxyUgi) throws ExecutionSetupException {
     this.table = table;
     this.partition = partition;
     this.inputSplit = inputSplit;
@@ -130,7 +127,7 @@ public abstract class HiveAbstractReader extends AbstractRecordReader {
 
     Properties tableProperties;
     try {
-      tableProperties = MetaStoreUtils.getTableMetadata(table);
+      tableProperties = HiveUtilities.getTableMetadata(table);
       final Properties partitionProperties =
           (partition == null) ?  tableProperties :
               HiveUtilities.getPartitionMetadata(partition, table);

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 17cae22..ccec61a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
+import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
 
 import java.io.IOException;
 import java.util.List;
@@ -103,7 +103,7 @@ public class HiveDrillNativeParquetScan extends HiveScan {
 
   @Override
   public String toString() {
-    final List<HivePartition> partitions = hiveReadEntry.getHivePartitionWrappers();
+    final List<HivePartitionWrapper> partitions = hiveReadEntry.getHivePartitionWrappers();
     int numPartitions = partitions == null ? 0 : partitions.size();
     return "HiveDrillNativeParquetScan [table=" + hiveReadEntry.getHiveTableWrapper()
         + ", columns=" + columns

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 42db5d0..66f41e2 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -42,8 +42,6 @@ import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
@@ -62,9 +60,9 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
   @Override
   public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
-    final Table table = config.getTable();
+    final HiveTableWithColumnCache table = config.getTable();
     final List<InputSplit> splits = config.getInputSplits();
-    final List<Partition> partitions = config.getPartitions();
+    final List<HivePartition> partitions = config.getPartitions();
     final List<SchemaPath> columns = config.getColumns();
     final String partitionDesignator = context.getOptions()
         .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
index 49f7689..e80b37b 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -83,7 +82,7 @@ public class HiveMetadataProvider {
   public HiveStats getStats(final HiveReadEntry hiveReadEntry) throws IOException {
     final Stopwatch timeGetStats = Stopwatch.createStarted();
 
-    final Table table = hiveReadEntry.getTable();
+    final HiveTableWithColumnCache table = hiveReadEntry.getTable();
     try {
       if (!isPartitionedTable) {
         final Properties properties = MetaStoreUtils.getTableMetadata(table);
@@ -96,7 +95,7 @@ public class HiveMetadataProvider {
         return getStatsEstimateFromInputSplits(getTableInputSplits());
       } else {
         final HiveStats aggStats = new HiveStats(0, 0);
-        for(Partition partition : hiveReadEntry.getPartitions()) {
+        for(HivePartition partition : hiveReadEntry.getPartitions()) {
           final Properties properties = HiveUtilities.getPartitionMetadata(partition, table);
           HiveStats stats = getStatsFromProps(properties);
 
@@ -124,7 +123,7 @@ public class HiveMetadataProvider {
       return tableInputSplits;
     }
 
-    final Properties properties = MetaStoreUtils.getTableMetadata(hiveReadEntry.getTable());
+    final Properties properties = HiveUtilities.getTableMetadata(hiveReadEntry.getTable());
     tableInputSplits = splitInputWithUGI(properties, hiveReadEntry.getTable().getSd(), null);
 
     return tableInputSplits;
@@ -133,7 +132,7 @@ public class HiveMetadataProvider {
   /** Helper method which returns the InputSplits for given partition. InputSplits are cached to speed up subsequent
    * metadata cache requests for the same partition(s).
    */
-  private List<InputSplitWrapper> getPartitionInputSplits(final Partition partition) throws Exception {
+  private List<InputSplitWrapper> getPartitionInputSplits(final HivePartition partition) throws Exception {
     if (partitionInputSplitMap.containsKey(partition)) {
       return partitionInputSplitMap.get(partition);
     }
@@ -161,7 +160,7 @@ public class HiveMetadataProvider {
       }
 
       final List<InputSplitWrapper> splits = Lists.newArrayList();
-      for (Partition p : hiveReadEntry.getPartitions()) {
+      for (HivePartition p : hiveReadEntry.getPartitions()) {
         splits.addAll(getPartitionInputSplits(p));
       }
       return splits;

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartition.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartition.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartition.java
new file mode 100644
index 0000000..ad539b1
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartition.java
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec.store.hive;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is wrapper of {@link Partition} class and used for
+ * storage of such additional information as index of list in column lists cache.
+ */
+public class HivePartition extends Partition {
+  // index of partition column list in the table's column list cache
+  private int columnListIndex;
+
+  public HivePartition(
+    List<String> values,
+    String dbName,
+    String tableName,
+    int createTime,
+    int lastAccessTime,
+    StorageDescriptor sd,
+    Map<String,String> parameters,
+    int columnListIndex)
+  {
+    super(values, dbName, tableName, createTime, lastAccessTime, sd, parameters);
+    this.columnListIndex = columnListIndex;
+  }
+
+  public HivePartition(Partition other, int columnListIndex) {
+    super(other);
+    this.columnListIndex = columnListIndex;
+  }
+
+  /**
+   * To reduce physical plan for Hive tables, in partitions does not stored list of columns
+   * but stored index of that list in the table's column list cache.
+   *
+   * @return index of partition column list in the table's column list cache
+   */
+  public int getColumnListIndex() {
+    return columnListIndex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
index 4df33ec..0cf7433 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
@@ -21,9 +21,7 @@ import java.util.List;
 
 import org.apache.calcite.schema.Schema.TableType;
 
-import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -33,42 +31,47 @@ import com.google.common.collect.Lists;
 public class HiveReadEntry {
 
   @JsonProperty("table")
-  public HiveTable table;
+  public HiveTableWrapper table;
   @JsonProperty("partitions")
-  public List<HiveTable.HivePartition> partitions;
+  public List<HivePartitionWrapper> partitions;
 
   @JsonIgnore
-  private List<Partition> partitionsUnwrapped = Lists.newArrayList();
+  private List<HivePartition> partitionsUnwrapped = Lists.newArrayList();
 
   @JsonCreator
-  public HiveReadEntry(@JsonProperty("table") HiveTable table,
-                       @JsonProperty("partitions") List<HiveTable.HivePartition> partitions) {
+  public HiveReadEntry(@JsonProperty("table") HiveTableWrapper table,
+                       @JsonProperty("partitions") List<HivePartitionWrapper> partitions) {
     this.table = table;
     this.partitions = partitions;
     if (partitions != null) {
-      for(HiveTable.HivePartition part : partitions) {
+      for(HivePartitionWrapper part : partitions) {
         partitionsUnwrapped.add(part.getPartition());
       }
     }
   }
 
   @JsonIgnore
-  public Table getTable() {
+  public HiveTableWithColumnCache getTable() {
     return table.getTable();
   }
 
   @JsonIgnore
-  public List<Partition> getPartitions() {
+  public HiveTableWrapper getTableWrapper() {
+    return table;
+  }
+
+  @JsonIgnore
+  public List<HivePartition> getPartitions() {
     return partitionsUnwrapped;
   }
 
   @JsonIgnore
-  public HiveTable getHiveTableWrapper() {
+  public HiveTableWrapper getHiveTableWrapper() {
     return table;
   }
 
   @JsonIgnore
-  public List<HivePartition> getHivePartitionWrappers() {
+  public List<HivePartitionWrapper> getHivePartitionWrappers() {
     return partitions;
   }
 
@@ -81,7 +84,7 @@ public class HiveReadEntry {
     return TableType.TABLE;
   }
 
-  public String getPartitionLocation(HiveTable.HivePartition partition) {
+  public String getPartitionLocation(HivePartitionWrapper partition) {
     String partitionPath = table.getTable().getSd().getLocation();
 
     for (String value: partition.values) {

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 1a58cbd..c6cc8a2 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -40,9 +40,10 @@ import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.hive.HiveMetadataProvider.HiveStats;
 import org.apache.drill.exec.store.hive.HiveMetadataProvider.InputSplitWrapper;
-import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
+import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapred.InputSplit;
 
@@ -56,6 +57,8 @@ import com.google.common.collect.Lists;
 import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
 
+import static org.apache.drill.exec.store.hive.DrillHiveMetaStoreClient.createPartitionWithSpecColumns;
+
 @JsonTypeName("hive-scan")
 public class HiveScan extends AbstractGroupScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class);
@@ -151,12 +154,14 @@ public class HiveScan extends AbstractGroupScan {
   public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
     try {
       final List<InputSplitWrapper> splits = mappings.get(minorFragmentId);
-      List<HivePartition> parts = Lists.newArrayList();
+      List<HivePartitionWrapper> parts = Lists.newArrayList();
       final List<String> encodedInputSplits = Lists.newArrayList();
       final List<String> splitTypes = Lists.newArrayList();
       for (final InputSplitWrapper split : splits) {
-        if (split.getPartition() != null) {
-          parts.add(new HivePartition(split.getPartition()));
+        final Partition splitPartition = split.getPartition();
+        if (splitPartition != null) {
+          HiveTableWithColumnCache table = hiveReadEntry.getTable();
+          parts.add(createPartitionWithSpecColumns(new HiveTableWithColumnCache(table, new ColumnListsCache(table)), splitPartition));
         }
 
         encodedInputSplits.add(serializeInputSplit(split.getSplit()));
@@ -166,7 +171,7 @@ public class HiveScan extends AbstractGroupScan {
         parts = null;
       }
 
-      final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts);
+      final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.getTableWrapper(), parts);
       return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, storagePlugin);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
@@ -259,7 +264,7 @@ public class HiveScan extends AbstractGroupScan {
 
   @Override
   public String toString() {
-    List<HivePartition> partitions = hiveReadEntry.getHivePartitionWrappers();
+    List<HivePartitionWrapper> partitions = hiveReadEntry.getHivePartitionWrappers();
     int numPartitions = partitions == null ? 0 : partitions.size();
     return "HiveScan [table=" + hiveReadEntry.getHiveTableWrapper()
         + ", columns=" + columns

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index 7aece71..47ea323 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -30,8 +30,6 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
@@ -63,9 +61,9 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
   public ScanBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
     List<RecordReader> readers = Lists.newArrayList();
-    Table table = config.getTable();
+    HiveTableWithColumnCache table = config.getTable();
     List<InputSplit> splits = config.getInputSplits();
-    List<Partition> partitions = config.getPartitions();
+    List<HivePartition> partitions = config.getPartitions();
     boolean hasPartitions = (partitions != null && partitions.size() > 0);
     int i = 0;
     final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(config.getUserName(),
@@ -80,7 +78,7 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
     }
     Constructor<? extends HiveAbstractReader> readerConstructor = null;
     try {
-      readerConstructor = readerClass.getConstructor(Table.class, Partition.class,
+      readerConstructor = readerClass.getConstructor(HiveTableWithColumnCache.class, HivePartition.class,
           InputSplit.class, List.class, FragmentContext.class, HiveConf.class,
           UserGroupInformation.class);
       for (InputSplit split : splits) {

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 74b68a6..107188c 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.store.hive;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -27,19 +26,13 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapred.InputSplit;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -58,9 +51,9 @@ public class HiveSubScan extends AbstractBase implements SubScan {
   @JsonIgnore
   protected List<InputSplit> inputSplits = Lists.newArrayList();
   @JsonIgnore
-  protected Table table;
+  protected HiveTableWithColumnCache table;
   @JsonIgnore
-  protected List<Partition> partitions;
+  protected List<HivePartition> partitions;
   @JsonIgnore
   protected HiveStoragePlugin storagePlugin;
 
@@ -112,11 +105,11 @@ public class HiveSubScan extends AbstractBase implements SubScan {
     return splits;
   }
 
-  public Table getTable() {
+  public HiveTableWithColumnCache getTable() {
     return table;
   }
 
-  public List<Partition> getPartitions() {
+  public List<HivePartition> getPartitions() {
     return partitions;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
deleted file mode 100644
index b6dd079..0000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Lists;
-
-@JsonTypeName("table")
-public class HiveTable {
-
-  @JsonIgnore
-  private Table table;
-
-  @JsonProperty
-  public String tableName;
-  @JsonProperty
-  public String dbName;
-  @JsonProperty
-  public String owner;
-  @JsonProperty
-  public int createTime;
-  @JsonProperty
-  public int lastAccessTime;
-  @JsonProperty
-  public int retention;
-  @JsonProperty
-  public StorageDescriptorWrapper sd;
-  @JsonProperty
-  public List<FieldSchemaWrapper> partitionKeys;
-  @JsonProperty
-  public Map<String,String> parameters;
-  @JsonProperty
-  public String viewOriginalText;
-  @JsonProperty
-  public String viewExpandedText;
-  @JsonProperty
-  public String tableType;
-
-  @JsonIgnore
-  public final Map<String, String> partitionNameTypeMap = new HashMap<>();
-
-  @JsonCreator
-  public HiveTable(@JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("owner") String owner, @JsonProperty("createTime") int createTime,
-                   @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("retention") int retention, @JsonProperty("sd") StorageDescriptorWrapper sd,
-                   @JsonProperty("partitionKeys") List<FieldSchemaWrapper> partitionKeys, @JsonProperty("parameters") Map<String, String> parameters,
-                   @JsonProperty("viewOriginalText") String viewOriginalText, @JsonProperty("viewExpandedText") String viewExpandedText, @JsonProperty("tableType") String tableType
-  ) {
-    this.tableName = tableName;
-    this.dbName = dbName;
-    this.owner = owner;
-    this.createTime = createTime;
-    this.lastAccessTime = lastAccessTime;
-    this.retention = retention;
-    this.sd = sd;
-    this.partitionKeys = partitionKeys;
-    this.parameters = parameters;
-    this.viewOriginalText = viewOriginalText;
-    this.viewExpandedText = viewExpandedText;
-    this.tableType = tableType;
-
-    List<FieldSchema> partitionKeysUnwrapped = Lists.newArrayList();
-    for (FieldSchemaWrapper w : partitionKeys) {
-      partitionKeysUnwrapped.add(w.getFieldSchema());
-      partitionNameTypeMap.put(w.name, w.type);
-    }
-    StorageDescriptor sdUnwrapped = sd.getSd();
-    this.table = new Table(tableName, dbName, owner, createTime, lastAccessTime, retention, sdUnwrapped, partitionKeysUnwrapped,
-        parameters, viewOriginalText, viewExpandedText, tableType);
-  }
-
-  public HiveTable(Table table) {
-    if (table == null) {
-      return;
-    }
-    this.table = table;
-    this.tableName = table.getTableName();
-    this.dbName = table.getDbName();
-    this.owner = table.getOwner();
-    this.createTime = table.getCreateTime();
-    this.lastAccessTime = table.getLastAccessTime();
-    this.retention = table.getRetention();
-    this.sd = new StorageDescriptorWrapper(table.getSd());
-    this.partitionKeys = Lists.newArrayList();
-    for (FieldSchema f : table.getPartitionKeys()) {
-      this.partitionKeys.add(new FieldSchemaWrapper(f));
-      partitionNameTypeMap.put(f.getName(), f.getType());
-    }
-    this.parameters = table.getParameters();
-    this.viewOriginalText = table.getViewOriginalText();
-    this.viewExpandedText = table.getViewExpandedText();
-    this.tableType = table.getTableType();
-  }
-
-  @JsonIgnore
-  public Table getTable() {
-    return table;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder("Table(");
-
-    sb.append("dbName:");
-    sb.append(this.dbName);
-    sb.append(", ");
-
-    sb.append("tableName:");
-    sb.append(this.tableName);
-    sb.append(")");
-
-    return sb.toString();
-  }
-
-  public static class HivePartition {
-
-    @JsonIgnore
-    private Partition partition;
-
-    @JsonProperty
-    public List<String> values;
-    @JsonProperty
-    public String tableName;
-    @JsonProperty
-    public String dbName;
-    @JsonProperty
-    public int createTime;
-    @JsonProperty
-    public int lastAccessTime;
-    @JsonProperty
-    public StorageDescriptorWrapper sd;
-    @JsonProperty
-    public Map<String,String> parameters;
-
-    @JsonCreator
-    public HivePartition(@JsonProperty("values") List<String> values, @JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("createTime") int createTime,
-                         @JsonProperty("lastAccessTime") int lastAccessTime,  @JsonProperty("sd") StorageDescriptorWrapper sd,
-                         @JsonProperty("parameters") Map<String, String> parameters
-    ) {
-      this.values = values;
-      this.tableName = tableName;
-      this.dbName = dbName;
-      this.createTime = createTime;
-      this.lastAccessTime = lastAccessTime;
-      this.sd = sd;
-      this.parameters = parameters;
-
-      StorageDescriptor sdUnwrapped = sd.getSd();
-      this.partition = new org.apache.hadoop.hive.metastore.api.Partition(values, tableName, dbName, createTime, lastAccessTime, sdUnwrapped, parameters);
-    }
-
-    public HivePartition(Partition partition) {
-      if (partition == null) {
-        return;
-      }
-      this.partition = partition;
-      this.values = partition.getValues();
-      this.tableName = partition.getTableName();
-      this.dbName = partition.getDbName();
-      this.createTime = partition.getCreateTime();
-      this.lastAccessTime = partition.getLastAccessTime();
-      this.sd = new StorageDescriptorWrapper(partition.getSd());
-      this.parameters = partition.getParameters();
-    }
-
-    @JsonIgnore
-    public Partition getPartition() {
-      return partition;
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder("Partition(");
-      sb.append("values:");
-      sb.append(this.values);
-      sb.append(")");
-      return sb.toString();
-    }
-  }
-
-  public static class StorageDescriptorWrapper {
-    @JsonIgnore
-    private StorageDescriptor sd;
-    @JsonProperty
-    public List<FieldSchemaWrapper> cols;
-    @JsonProperty
-    public String location;
-    @JsonProperty
-    public String inputFormat;
-    @JsonProperty
-    public String outputFormat;
-    @JsonProperty
-    public boolean compressed;
-    @JsonProperty
-    public int numBuckets;
-    @JsonProperty
-    public SerDeInfoWrapper serDeInfo;
-    //    @JsonProperty
-//    public List<String> bucketCols;
-    @JsonProperty
-    public List<OrderWrapper> sortCols;
-    @JsonProperty
-    public Map<String,String> parameters;
-
-    @JsonCreator
-    public StorageDescriptorWrapper(@JsonProperty("cols") List<FieldSchemaWrapper> cols, @JsonProperty("location") String location, @JsonProperty("inputFormat") String inputFormat,
-                                    @JsonProperty("outputFormat") String outputFormat, @JsonProperty("compressed") boolean compressed, @JsonProperty("numBuckets") int numBuckets,
-                                    @JsonProperty("serDeInfo") SerDeInfoWrapper serDeInfo,  @JsonProperty("sortCols") List<OrderWrapper> sortCols,
-                                    @JsonProperty("parameters") Map<String,String> parameters) {
-      this.cols = cols;
-      this.location = location;
-      this.inputFormat = inputFormat;
-      this.outputFormat = outputFormat;
-      this.compressed = compressed;
-      this.numBuckets = numBuckets;
-      this.serDeInfo = serDeInfo;
-//      this.bucketCols = bucketCols;
-      this.sortCols = sortCols;
-      this.parameters = parameters;
-      List<FieldSchema> colsUnwrapped = Lists.newArrayList();
-      for (FieldSchemaWrapper w: cols) {
-        colsUnwrapped.add(w.getFieldSchema());
-      }
-      SerDeInfo serDeInfoUnwrapped = serDeInfo.getSerDeInfo();
-      List<Order> sortColsUnwrapped = Lists.newArrayList();
-      for (OrderWrapper w : sortCols) {
-        sortColsUnwrapped.add(w.getOrder());
-      }
-//      this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped,
-//              bucketCols, sortColsUnwrapped, parameters);
-      this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped,
-          null, sortColsUnwrapped, parameters);
-    }
-
-    public StorageDescriptorWrapper(StorageDescriptor sd) {
-      this.sd = sd;
-      this.cols = Lists.newArrayList();
-      for (FieldSchema f : sd.getCols()) {
-        this.cols.add(new FieldSchemaWrapper(f));
-      }
-      this.location = sd.getLocation();
-      this.inputFormat = sd.getInputFormat();
-      this.outputFormat = sd.getOutputFormat();
-      this.compressed = sd.isCompressed();
-      this.numBuckets = sd.getNumBuckets();
-      this.serDeInfo = new SerDeInfoWrapper(sd.getSerdeInfo());
-//      this.bucketCols = sd.getBucketCols();
-      this.sortCols = Lists.newArrayList();
-      for (Order o : sd.getSortCols()) {
-        this.sortCols.add(new OrderWrapper(o));
-      }
-      this.parameters = sd.getParameters();
-    }
-
-    @JsonIgnore
-    public StorageDescriptor getSd() {
-      return sd;
-    }
-
-  }
-
-  public static class SerDeInfoWrapper {
-    @JsonIgnore
-    private SerDeInfo serDeInfo;
-    @JsonProperty
-    public String name;
-    @JsonProperty
-    public String serializationLib;
-    @JsonProperty
-    public Map<String,String> parameters;
-
-    @JsonCreator
-    public SerDeInfoWrapper(@JsonProperty("name") String name, @JsonProperty("serializationLib") String serializationLib, @JsonProperty("parameters") Map<String, String> parameters) {
-      this.name = name;
-      this.serializationLib = serializationLib;
-      this.parameters = parameters;
-      this.serDeInfo = new SerDeInfo(name, serializationLib, parameters);
-    }
-
-    public SerDeInfoWrapper(SerDeInfo serDeInfo) {
-      this.serDeInfo = serDeInfo;
-      this.name = serDeInfo.getName();
-      this.serializationLib = serDeInfo.getSerializationLib();
-      this.parameters = serDeInfo.getParameters();
-    }
-
-    @JsonIgnore
-    public SerDeInfo getSerDeInfo() {
-      return serDeInfo;
-    }
-  }
-
-  public static class FieldSchemaWrapper {
-    @JsonIgnore
-    private FieldSchema fieldSchema;
-    @JsonProperty
-    public String name;
-    @JsonProperty
-    public String type;
-    @JsonProperty
-    public String comment;
-
-    @JsonCreator
-    public FieldSchemaWrapper(@JsonProperty("name") String name, @JsonProperty("type") String type, @JsonProperty("comment") String comment) {
-      this.name = name;
-      this.type = type;
-      this.comment = comment;
-      this.fieldSchema = new FieldSchema(name, type, comment);
-    }
-
-    public FieldSchemaWrapper(FieldSchema fieldSchema) {
-      this.fieldSchema = fieldSchema;
-      this.name = fieldSchema.getName();
-      this.type = fieldSchema.getType();
-      this.comment = fieldSchema.getComment();
-    }
-
-    @JsonIgnore
-    public FieldSchema getFieldSchema() {
-      return fieldSchema;
-    }
-  }
-
-  public static class OrderWrapper {
-    @JsonIgnore
-    private Order ord;
-    @JsonProperty
-    public String col;
-    @JsonProperty
-    public int order;
-
-    @JsonCreator
-    public OrderWrapper(@JsonProperty("col") String col, @JsonProperty("order") int order) {
-      this.col = col;
-      this.order = order;
-    }
-
-    public OrderWrapper(Order ord) {
-      this.ord = ord;
-      this.col = ord.getCol();
-      this.order = ord.getOrder();
-    }
-
-    @JsonIgnore
-    public Order getOrder() {
-      return ord;
-    }
-  }
-
-  public Map<String, String> getPartitionNameTypeMap() {
-    return partitionNameTypeMap;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java
new file mode 100644
index 0000000..91888ef
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java
@@ -0,0 +1,76 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec.store.hive;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is wrapper of {@link Table} class and used for
+ * storage of such additional information as column lists cache.
+ */
+public class HiveTableWithColumnCache extends Table {
+
+  private ColumnListsCache columnListsCache;
+
+  public HiveTableWithColumnCache() {
+    super();
+  }
+
+  public HiveTableWithColumnCache(
+    String tableName,
+    String dbName,
+    String owner,
+    int createTime,
+    int lastAccessTime,
+    int retention,
+    StorageDescriptor sd,
+    List<FieldSchema> partitionKeys,
+    Map<String,String> parameters,
+    String viewOriginalText,
+    String viewExpandedText,
+    String tableType,
+    ColumnListsCache columnListsCache) {
+    super(tableName, dbName, owner, createTime, lastAccessTime, retention, sd,
+      partitionKeys, parameters, viewOriginalText, viewExpandedText, tableType);
+    this.columnListsCache = columnListsCache;
+  }
+
+  public HiveTableWithColumnCache(HiveTableWithColumnCache other) {
+    super(other);
+    columnListsCache = other.getColumnListsCache();
+  }
+
+  public HiveTableWithColumnCache(Table other, ColumnListsCache columnListsCache) {
+    super(other);
+    this.columnListsCache = columnListsCache;
+  }
+
+  /**
+   * To reduce physical plan for Hive tables, unique partition lists of columns stored in the
+   * table's column lists cache.
+   *
+   * @return table's column lists cache
+   */
+  public ColumnListsCache getColumnListsCache() {
+    return columnListsCache;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWrapper.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWrapper.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWrapper.java
new file mode 100644
index 0000000..7f2afa6
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWrapper.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.Lists;
+
+@JsonTypeName("table")
+public class HiveTableWrapper {
+
+  @JsonIgnore
+  private HiveTableWithColumnCache table;
+
+  @JsonProperty
+  public String tableName;
+  @JsonProperty
+  public String dbName;
+  @JsonProperty
+  public String owner;
+  @JsonProperty
+  public int createTime;
+  @JsonProperty
+  public int lastAccessTime;
+  @JsonProperty
+  public int retention;
+  @JsonProperty
+  public StorageDescriptorWrapper sd;
+  @JsonProperty
+  public List<FieldSchemaWrapper> partitionKeys;
+  @JsonProperty
+  public Map<String,String> parameters;
+  @JsonProperty
+  public String viewOriginalText;
+  @JsonProperty
+  public String viewExpandedText;
+  @JsonProperty
+  public String tableType;
+  @JsonProperty
+  public ColumnsCacheWrapper columnsCache;
+
+  @JsonIgnore
+  public final Map<String, String> partitionNameTypeMap = new HashMap<>();
+
+  @JsonCreator
+  public HiveTableWrapper(@JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("owner") String owner,
+                          @JsonProperty("createTime") int createTime, @JsonProperty("lastAccessTime") int lastAccessTime,
+                          @JsonProperty("retention") int retention, @JsonProperty("sd") StorageDescriptorWrapper sd,
+                          @JsonProperty("partitionKeys") List<FieldSchemaWrapper> partitionKeys, @JsonProperty("parameters") Map<String, String> parameters,
+                          @JsonProperty("viewOriginalText") String viewOriginalText, @JsonProperty("viewExpandedText") String viewExpandedText,
+                          @JsonProperty("tableType") String tableType, @JsonProperty("columnsCache") ColumnsCacheWrapper columnsCache
+  ) {
+    this.tableName = tableName;
+    this.dbName = dbName;
+    this.owner = owner;
+    this.createTime = createTime;
+    this.lastAccessTime = lastAccessTime;
+    this.retention = retention;
+    this.sd = sd;
+    this.partitionKeys = partitionKeys;
+    this.parameters = parameters;
+    this.viewOriginalText = viewOriginalText;
+    this.viewExpandedText = viewExpandedText;
+    this.tableType = tableType;
+    this.columnsCache = columnsCache;
+
+    List<FieldSchema> partitionKeysUnwrapped = Lists.newArrayList();
+    for (FieldSchemaWrapper w : partitionKeys) {
+      partitionKeysUnwrapped.add(w.getFieldSchema());
+      partitionNameTypeMap.put(w.name, w.type);
+    }
+    StorageDescriptor sdUnwrapped = sd.getSd();
+    this.table = new HiveTableWithColumnCache(tableName, dbName, owner, createTime, lastAccessTime, retention, sdUnwrapped, partitionKeysUnwrapped,
+        parameters, viewOriginalText, viewExpandedText, tableType, columnsCache.getColumnListsCache());
+  }
+
+  public HiveTableWrapper(HiveTableWithColumnCache table) {
+    if (table == null) {
+      return;
+    }
+    this.table = table;
+    this.tableName = table.getTableName();
+    this.dbName = table.getDbName();
+    this.owner = table.getOwner();
+    this.createTime = table.getCreateTime();
+    this.lastAccessTime = table.getLastAccessTime();
+    this.retention = table.getRetention();
+    this.sd = new StorageDescriptorWrapper(table.getSd());
+    this.partitionKeys = Lists.newArrayList();
+    for (FieldSchema f : table.getPartitionKeys()) {
+      this.partitionKeys.add(new FieldSchemaWrapper(f));
+      partitionNameTypeMap.put(f.getName(), f.getType());
+    }
+    this.parameters = table.getParameters();
+    this.viewOriginalText = table.getViewOriginalText();
+    this.viewExpandedText = table.getViewExpandedText();
+    this.tableType = table.getTableType();
+    this.columnsCache = new ColumnsCacheWrapper(table.getColumnListsCache());
+  }
+
+  @JsonIgnore
+  public HiveTableWithColumnCache getTable() {
+    return table;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("Table(");
+
+    sb.append("dbName:");
+    sb.append(this.dbName);
+    sb.append(", ");
+
+    sb.append("tableName:");
+    sb.append(this.tableName);
+    sb.append(")");
+
+    return sb.toString();
+  }
+
+  /**
+   * Wrapper for {@link Partition} class. Used for serialization and deserialization of {@link HivePartition}.
+   */
+  public static class HivePartitionWrapper {
+
+    @JsonIgnore
+    private HivePartition partition;
+
+    @JsonProperty
+    public List<String> values;
+
+    @JsonProperty
+    public String tableName;
+
+    @JsonProperty
+    public String dbName;
+
+    @JsonProperty
+    public int createTime;
+
+    @JsonProperty
+    public int lastAccessTime;
+
+    @JsonProperty
+    public StorageDescriptorWrapper sd;
+
+    @JsonProperty
+    public Map<String, String> parameters;
+
+    @JsonProperty
+    private int columnListIndex;
+
+    @JsonCreator
+    public HivePartitionWrapper(@JsonProperty("values") List<String> values, @JsonProperty("tableName") String tableName,
+                                @JsonProperty("dbName") String dbName, @JsonProperty("createTime") int createTime,
+                                @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("sd") StorageDescriptorWrapper sd,
+                                @JsonProperty("parameters") Map<String, String> parameters, @JsonProperty("columnListIndex") int columnListIndex) {
+      this.values = values;
+      this.tableName = tableName;
+      this.dbName = dbName;
+      this.createTime = createTime;
+      this.lastAccessTime = lastAccessTime;
+      this.sd = sd;
+      this.parameters = parameters;
+      this.columnListIndex = columnListIndex;
+
+      StorageDescriptor sdUnwrapped = sd.getSd();
+      this.partition = new HivePartition(values, tableName, dbName, createTime, lastAccessTime, sdUnwrapped, parameters, columnListIndex);
+    }
+
+    public HivePartitionWrapper(HivePartition partition) {
+      if (partition == null) {
+        return;
+      }
+      this.partition = partition;
+      this.values = partition.getValues();
+      this.tableName = partition.getTableName();
+      this.dbName = partition.getDbName();
+      this.createTime = partition.getCreateTime();
+      this.lastAccessTime = partition.getLastAccessTime();
+      this.sd = new StorageDescriptorWrapper(partition.getSd());
+      this.parameters = partition.getParameters();
+      this.columnListIndex = partition.getColumnListIndex();
+    }
+
+    @JsonIgnore
+    public HivePartition getPartition() {
+      return partition;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("Partition(");
+      sb.append("values:");
+      sb.append(this.values);
+      sb.append(")");
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Wrapper for {@link StorageDescriptor} class.
+   * Used in {@link HivePartitionWrapper} and {@link HiveTableWrapper}
+   * for serialization and deserialization of {@link StorageDescriptor}.
+   */
+  public static class StorageDescriptorWrapper {
+
+    @JsonIgnore
+    private StorageDescriptor sd;
+
+    // column lists stored in ColumnListsCache
+    @JsonIgnore
+    public List<FieldSchemaWrapper> columns;
+
+    @JsonProperty
+    public String location;
+
+    @JsonProperty
+    public String inputFormat;
+
+    @JsonProperty
+    public String outputFormat;
+
+    @JsonProperty
+    public boolean compressed;
+
+    @JsonProperty
+    public int numBuckets;
+
+    @JsonProperty
+    public SerDeInfoWrapper serDeInfo;
+
+    @JsonProperty
+    public List<OrderWrapper> sortCols;
+
+    @JsonProperty
+    public Map<String, String> parameters;
+
+    @JsonCreator
+    public StorageDescriptorWrapper(@JsonProperty("columns") List<FieldSchemaWrapper> columns, @JsonProperty("location") String location, @JsonProperty("inputFormat") String inputFormat,
+                                    @JsonProperty("outputFormat") String outputFormat, @JsonProperty("compressed") boolean compressed, @JsonProperty("numBuckets") int numBuckets,
+                                    @JsonProperty("serDeInfo") SerDeInfoWrapper serDeInfo,  @JsonProperty("sortCols") List<OrderWrapper> sortCols,
+                                    @JsonProperty("parameters") Map<String,String> parameters) {
+      this.columns = columns;
+      this.location = location;
+      this.inputFormat = inputFormat;
+      this.outputFormat = outputFormat;
+      this.compressed = compressed;
+      this.numBuckets = numBuckets;
+      this.serDeInfo = serDeInfo;
+      this.sortCols = sortCols;
+      this.parameters = parameters;
+      List<FieldSchema> colsUnwrapped;
+      if (columns != null) {
+        colsUnwrapped = Lists.newArrayList();
+        for (FieldSchemaWrapper fieldSchema : columns) {
+          colsUnwrapped.add(fieldSchema.getFieldSchema());
+        }
+      } else {
+        colsUnwrapped = null;
+      }
+      SerDeInfo serDeInfoUnwrapped = serDeInfo.getSerDeInfo();
+      List<Order> sortColsUnwrapped;
+      if (sortCols != null) {
+        sortColsUnwrapped = Lists.newArrayList();
+        for (OrderWrapper order : sortCols) {
+          sortColsUnwrapped.add(order.getOrder());
+        }
+      } else {
+        sortColsUnwrapped = null;
+      }
+      sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat,
+        compressed, numBuckets, serDeInfoUnwrapped, null, sortColsUnwrapped, parameters);
+    }
+
+    public StorageDescriptorWrapper(StorageDescriptor storageDescriptor) {
+      sd = storageDescriptor;
+      location = storageDescriptor.getLocation();
+      inputFormat = storageDescriptor.getInputFormat();
+      outputFormat = storageDescriptor.getOutputFormat();
+      compressed = storageDescriptor.isCompressed();
+      numBuckets = storageDescriptor.getNumBuckets();
+      serDeInfo = new SerDeInfoWrapper(storageDescriptor.getSerdeInfo());
+      if (sd.getSortCols() != null) {
+        sortCols = Lists.newArrayList();
+        for (Order order : sd.getSortCols()) {
+          sortCols.add(new OrderWrapper(order));
+        }
+      }
+      parameters = storageDescriptor.getParameters();
+      if (sd.getCols() != null) {
+        this.columns = Lists.newArrayList();
+        for (FieldSchema fieldSchema : sd.getCols()) {
+          this.columns.add(new FieldSchemaWrapper(fieldSchema));
+        }
+      }
+    }
+
+    @JsonIgnore
+    public StorageDescriptor getSd() {
+      return sd;
+    }
+  }
+
+  public static class SerDeInfoWrapper {
+    @JsonIgnore
+    private SerDeInfo serDeInfo;
+    @JsonProperty
+    public String name;
+    @JsonProperty
+    public String serializationLib;
+    @JsonProperty
+    public Map<String,String> parameters;
+
+    @JsonCreator
+    public SerDeInfoWrapper(@JsonProperty("name") String name, @JsonProperty("serializationLib") String serializationLib, @JsonProperty("parameters") Map<String, String> parameters) {
+      this.name = name;
+      this.serializationLib = serializationLib;
+      this.parameters = parameters;
+      this.serDeInfo = new SerDeInfo(name, serializationLib, parameters);
+    }
+
+    public SerDeInfoWrapper(SerDeInfo serDeInfo) {
+      this.serDeInfo = serDeInfo;
+      this.name = serDeInfo.getName();
+      this.serializationLib = serDeInfo.getSerializationLib();
+      this.parameters = serDeInfo.getParameters();
+    }
+
+    @JsonIgnore
+    public SerDeInfo getSerDeInfo() {
+      return serDeInfo;
+    }
+  }
+
+  public static class FieldSchemaWrapper {
+    @JsonIgnore
+    private FieldSchema fieldSchema;
+    @JsonProperty
+    public String name;
+    @JsonProperty
+    public String type;
+    @JsonProperty
+    public String comment;
+
+    @JsonCreator
+    public FieldSchemaWrapper(@JsonProperty("name") String name, @JsonProperty("type") String type, @JsonProperty("comment") String comment) {
+      this.name = name;
+      this.type = type;
+      this.comment = comment;
+      this.fieldSchema = new FieldSchema(name, type, comment);
+    }
+
+    public FieldSchemaWrapper(FieldSchema fieldSchema) {
+      this.fieldSchema = fieldSchema;
+      this.name = fieldSchema.getName();
+      this.type = fieldSchema.getType();
+      this.comment = fieldSchema.getComment();
+    }
+
+    @JsonIgnore
+    public FieldSchema getFieldSchema() {
+      return fieldSchema;
+    }
+  }
+
+  public static class OrderWrapper {
+    @JsonIgnore
+    private Order ord;
+    @JsonProperty
+    public String col;
+    @JsonProperty
+    public int order;
+
+    @JsonCreator
+    public OrderWrapper(@JsonProperty("col") String col, @JsonProperty("order") int order) {
+      this.col = col;
+      this.order = order;
+    }
+
+    public OrderWrapper(Order ord) {
+      this.ord = ord;
+      this.col = ord.getCol();
+      this.order = ord.getOrder();
+    }
+
+    @JsonIgnore
+    public Order getOrder() {
+      return ord;
+    }
+  }
+
+  public Map<String, String> getPartitionNameTypeMap() {
+    return partitionNameTypeMap;
+  }
+
+  /**
+   * Wrapper for {@link ColumnListsCache} class.
+   * Used in {@link HiveTableWrapper} for serialization and deserialization of {@link ColumnListsCache}.
+   */
+  public static class ColumnsCacheWrapper {
+    @JsonIgnore
+    private final ColumnListsCache columnListsCache;
+
+    @JsonProperty
+    private final List<List<FieldSchemaWrapper>> keys;
+
+    @JsonCreator
+    public ColumnsCacheWrapper(@JsonProperty("keys") List<List<FieldSchemaWrapper>> keys) {
+      this.keys = keys;
+      this.columnListsCache = new ColumnListsCache();
+      for (List<FieldSchemaWrapper> columns : keys) {
+        final List<FieldSchema> columnsUnwrapped = Lists.newArrayList();
+        for (FieldSchemaWrapper field : columns) {
+          columnsUnwrapped.add(field.getFieldSchema());
+        }
+        columnListsCache.addOrGet(columnsUnwrapped);
+      }
+    }
+
+    public ColumnsCacheWrapper(ColumnListsCache columnListsCache) {
+      this.columnListsCache = columnListsCache;
+      final List<List<FieldSchemaWrapper>> keysWrapped = Lists.newArrayList();
+      for (List<FieldSchema> columns : columnListsCache.getFields()) {
+        final List<FieldSchemaWrapper> columnsWrapped = Lists.newArrayList();
+        for (FieldSchema field : columns) {
+          columnsWrapped.add(new FieldSchemaWrapper(field));
+        }
+        keysWrapped.add(columnsWrapped);
+      }
+      this.keys = keysWrapped;
+    }
+
+    @JsonIgnore
+    public ColumnListsCache getColumnListsCache() {
+      return columnListsCache;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index 2e23aff..1d5e6bf 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -71,6 +71,7 @@ import org.joda.time.DateTimeZone;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -398,12 +399,14 @@ public class HiveUtilities {
    * Wrapper around {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)} which also adds parameters from table
    * to properties returned by {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)}.
    *
-   * @param partition
-   * @param table
-   * @return
+   * @param partition the source of partition level parameters
+   * @param table     the source of table level parameters
+   * @return properties
    */
-  public static Properties getPartitionMetadata(final Partition partition, final Table table) {
-    final Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+  public static Properties getPartitionMetadata(final HivePartition partition, final HiveTableWithColumnCache table) {
+    final Properties properties;
+    restoreColumns(table, partition);
+    properties = MetaStoreUtils.getPartitionMetadata(partition, table);
 
     // SerDe expects properties from Table, but above call doesn't add Table properties.
     // Include Table properties in final list in order to not to break SerDes that depend on
@@ -417,6 +420,34 @@ public class HiveUtilities {
     return properties;
   }
 
+  /**
+   * Sets columns from table cache to table and partition.
+   *
+   * @param partition partition which will set column list
+   * @param table     the source of column lists cache
+   */
+  public static void restoreColumns(HiveTableWithColumnCache table, HivePartition partition) {
+    // exactly the same column lists for partitions or table
+    // stored only one time to reduce physical plan serialization
+    if (partition != null && partition.getSd().getCols() == null) {
+      partition.getSd().setCols(table.getColumnListsCache().getColumns(partition.getColumnListIndex()));
+    }
+    if (table.getSd().getCols() == null) {
+      table.getSd().setCols(table.getColumnListsCache().getColumns(0));
+    }
+  }
+
+  /**
+   * Wrapper around {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}
+   * which also sets columns from table cache to table and returns properties returned by
+   * {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}.
+   */
+  public static Properties getTableMetadata(HiveTableWithColumnCache table) {
+    restoreColumns(table, null);
+    return MetaStoreUtils.getSchema(table.getSd(), table.getSd(), table.getParameters(),
+      table.getDbName(), table.getTableName(), table.getPartitionKeys());
+  }
+
   public static void throwUnsupportedHiveDataTypeError(String unsupportedType) {
     StringBuilder errMsg = new StringBuilder();
     errMsg.append(String.format("Unsupported Hive data type %s. ", unsupportedType));

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
index 29f7757..af02c0a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveStoragePlugin;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
@@ -43,11 +44,11 @@ import com.google.common.collect.Lists;
 public class DrillHiveTable extends DrillTable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillHiveTable.class);
 
-  protected final Table hiveTable;
+  protected final HiveTableWithColumnCache hiveTable;
 
   public DrillHiveTable(String storageEngineName, HiveStoragePlugin plugin, String userName, HiveReadEntry readEntry) {
     super(storageEngineName, plugin, userName, readEntry);
-    this.hiveTable = new Table(readEntry.getTable());
+    this.hiveTable = new HiveTableWithColumnCache(readEntry.getTable());
   }
 
   @Override
@@ -55,7 +56,7 @@ public class DrillHiveTable extends DrillTable{
     List<RelDataType> typeList = Lists.newArrayList();
     List<String> fieldNameList = Lists.newArrayList();
 
-    List<FieldSchema> hiveFields = hiveTable.getCols();
+    List<FieldSchema> hiveFields = hiveTable.getColumnListsCache().getColumns(0);
     for(FieldSchema hiveField : hiveFields) {
       fieldNameList.add(hiveField.getName());
       typeList.add(getNullableRelDataTypeFromHiveType(

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
index 7ac1896..a32f538 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
@@ -17,16 +17,19 @@
  */
 package org.apache.drill.exec;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.exec.hive.HiveTestBase;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.List;
+
 public class TestHivePartitionPruning extends HiveTestBase {
   // enable decimal data type
   @BeforeClass
@@ -149,6 +152,30 @@ public class TestHivePartitionPruning extends HiveTestBase {
         .go();
   }
 
+  @Test // DRILL-5032
+  public void testPartitionColumnsCaching() throws Exception {
+    final String query = "EXPLAIN PLAN FOR SELECT * FROM hive.partition_with_few_schemas";
+
+    List<QueryDataBatch> queryDataBatches = testSqlWithResults(query);
+    String resultString = getResultString(queryDataBatches, "|");
+
+    // different for both partitions column strings from physical plan
+    String columnString = "\"name\" : \"a\"";
+    String secondColumnString = "\"name\" : \"a1\"";
+
+    int columnIndex = resultString.indexOf(columnString);
+    assertTrue(columnIndex >= 0);
+    columnIndex = resultString.indexOf(columnString, columnIndex + 1);
+    // checks that column added to physical plan only one time
+    assertEquals(-1, columnIndex);
+
+    int secondColumnIndex = resultString.indexOf(secondColumnString);
+    assertTrue(secondColumnIndex >= 0);
+    secondColumnIndex = resultString.indexOf(secondColumnString, secondColumnIndex + 1);
+    // checks that column added to physical plan only one time
+    assertEquals(-1, secondColumnIndex);
+  }
+
   @AfterClass
   public static void disableDecimalDataType() throws Exception {
     test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index 0a94867..fb4bb17 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -43,6 +43,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
         .baselineValues("hive.default", "kv_sh")
         .baselineValues("hive.default", "countstar_parquet")
         .baselineValues("hive.default", "simple_json")
+        .baselineValues("hive.default", "partition_with_few_schemas")
         .go();
 
     testBuilder()
@@ -243,6 +244,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
         .baselineValues("DRILL", "hive.default", "readtest_parquet", "TABLE")
         .baselineValues("DRILL", "hive.default", "hiveview", "VIEW")
         .baselineValues("DRILL", "hive.default", "partition_pruning_test", "TABLE")
+        .baselineValues("DRILL", "hive.default", "partition_with_few_schemas", "TABLE")
         .baselineValues("DRILL", "hive.default", "kv_parquet", "TABLE")
         .baselineValues("DRILL", "hive.default", "countstar_parquet", "TABLE")
         .baselineValues("DRILL", "hive.default", "kv_sh", "TABLE")

http://git-wip-us.apache.org/repos/asf/drill/blob/03928af0/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 7a5b72d..435c66b 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -438,6 +438,15 @@ public class HiveTestDataGenerator {
     executeQuery(hiveDriver, "INSERT OVERWRITE TABLE partition_pruning_test PARTITION(c, d, e) " +
         "SELECT a, b, c, d, e FROM partition_pruning_test_loadtable");
 
+    executeQuery(hiveDriver,
+      "CREATE TABLE IF NOT EXISTS partition_with_few_schemas(a DATE, b TIMESTAMP) "+
+        "partitioned by (c INT, d INT, e INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE");
+    executeQuery(hiveDriver, "INSERT OVERWRITE TABLE partition_with_few_schemas PARTITION(c, d, e) " +
+      "SELECT a, b, c, d, e FROM partition_pruning_test_loadtable");
+    executeQuery(hiveDriver,"alter table partition_with_few_schemas partition(c=1, d=1, e=1) change a a1 INT");
+    executeQuery(hiveDriver,"alter table partition_with_few_schemas partition(c=1, d=1, e=2) change a a1 INT");
+    executeQuery(hiveDriver,"alter table partition_with_few_schemas partition(c=2, d=2, e=2) change a a1 INT");
+
     // Add a partition with custom location
     executeQuery(hiveDriver,
         String.format("ALTER TABLE partition_pruning_test ADD PARTITION (c=99, d=98, e=97) LOCATION '%s'",