You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/12/15 18:37:45 UTC

drill git commit: DRILL-4194: Improve performance of the HiveScan metadata fetch operation

Repository: drill
Updated Branches:
  refs/heads/master 2953fe587 -> bc74629a5


DRILL-4194: Improve performance of the HiveScan metadata fetch operation

+ Use the stats (numRows) stored in Hive metastore whenever available to
  calculate the costs for planning purpose
+ Delay the costly operation of loading of InputSplits until needed. When
  InputSplits are loaded, cache them at query level to speedup subsequent
  access.

this closes #301


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bc74629a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bc74629a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bc74629a

Branch: refs/heads/master
Commit: bc74629a546afe109252dcf4e3ef00ffc22e7a7a
Parents: 2953fe5
Author: vkorukanti <ve...@gmail.com>
Authored: Fri Dec 11 11:36:11 2015 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Dec 15 02:26:43 2015 -0800

----------------------------------------------------------------------
 ...onvertHiveParquetScanToDrillParquetScan.java |   3 +-
 .../store/hive/HiveDrillNativeParquetScan.java  |  16 +-
 .../exec/store/hive/HiveMetadataProvider.java   | 320 +++++++++++++++++++
 .../apache/drill/exec/store/hive/HiveScan.java  | 187 +++--------
 .../exec/store/hive/HiveStoragePlugin.java      |   2 +-
 5 files changed, 377 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/bc74629a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index 14e4a6f..7f42336 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -222,7 +222,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
             hiveScan.getUserName(),
             hiveScan.hiveReadEntry,
             hiveScan.storagePlugin,
-            nativeScanCols);
+            nativeScanCols,
+            null);
 
     return new DrillScanRel(
         hiveScanRel.getCluster(),

http://git-wip-us.apache.org/repos/asf/drill/blob/bc74629a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 41d06a9..17cae22 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
 
 import java.io.IOException;
 import java.util.List;
@@ -43,14 +44,13 @@ public class HiveDrillNativeParquetScan extends HiveScan {
                                     @JsonProperty("hive-table") HiveReadEntry hiveReadEntry,
                                     @JsonProperty("storage-plugin") String storagePluginName,
                                     @JsonProperty("columns") List<SchemaPath> columns,
-                                    @JsonProperty("numPartitions") final int numPartitions,
                                     @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
-    super(userName, hiveReadEntry, storagePluginName, columns, numPartitions, pluginRegistry);
+    super(userName, hiveReadEntry, storagePluginName, columns, pluginRegistry);
   }
 
   public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin,
-      List<SchemaPath> columns) throws ExecutionSetupException {
-    super(userName, hiveReadEntry, storagePlugin, columns);
+      List<SchemaPath> columns, HiveMetadataProvider metadataProvider) throws ExecutionSetupException {
+    super(userName, hiveReadEntry, storagePlugin, columns, metadataProvider);
   }
 
   public HiveDrillNativeParquetScan(final HiveScan hiveScan) {
@@ -91,7 +91,7 @@ public class HiveDrillNativeParquetScan extends HiveScan {
 
   @Override
   public HiveScan clone(HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
-    return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, storagePlugin, columns);
+    return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, storagePlugin, columns, metadataProvider);
   }
 
   @Override
@@ -103,10 +103,12 @@ public class HiveDrillNativeParquetScan extends HiveScan {
 
   @Override
   public String toString() {
+    final List<HivePartition> partitions = hiveReadEntry.getHivePartitionWrappers();
+    int numPartitions = partitions == null ? 0 : partitions.size();
     return "HiveDrillNativeParquetScan [table=" + hiveReadEntry.getHiveTableWrapper()
-        + ", inputSplits=" + inputSplits
         + ", columns=" + columns
         + ", numPartitions=" + numPartitions
-        + ", partitions= " + hiveReadEntry.getHivePartitionWrappers() +"]";
+        + ", partitions= " + partitions
+        + ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry) + "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bc74629a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
new file mode 100644
index 0000000..7b36796
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class which provides methods to get metadata of given Hive table selection. It tries to use the stats stored in
+ * MetaStore whenever available and delays the costly operation of loading of InputSplits until needed. When
+ * loaded, InputSplits are cached to speedup subsequent access.
+ */
+public class HiveMetadataProvider {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveMetadataProvider.class);
+
+  public static final int RECORD_SIZE = 1024;
+
+  private final HiveReadEntry hiveReadEntry;
+  private final UserGroupInformation ugi;
+  private final boolean isPartitionedTable;
+  private final Map<Partition, List<InputSplitWrapper>> partitionInputSplitMap;
+  private List<InputSplitWrapper> tableInputSplits;
+
+  private final Stopwatch watch = new Stopwatch();
+
+  public HiveMetadataProvider(final String userName, final HiveReadEntry hiveReadEntry) {
+    this.hiveReadEntry = hiveReadEntry;
+    this.ugi = ImpersonationUtil.createProxyUgi(userName);
+    isPartitionedTable = hiveReadEntry.getTable().getPartitionKeysSize() > 0;
+    partitionInputSplitMap = Maps.newHashMap();
+  }
+
+  /**
+   * Return stats for table/partitions in given {@link HiveReadEntry}. If valid stats are available in MetaStore,
+   * return it. Otherwise estimate using the size of the input data.
+   *
+   * @param hiveReadEntry Subset of the {@link HiveReadEntry} used when creating this cache object.
+   * @return
+   * @throws IOException
+   */
+  public HiveStats getStats(final HiveReadEntry hiveReadEntry) throws IOException {
+    final Stopwatch timeGetStats = new Stopwatch().start();
+
+    final Table table = hiveReadEntry.getTable();
+    try {
+      if (!isPartitionedTable) {
+        final Properties properties = MetaStoreUtils.getTableMetadata(table);
+        final HiveStats stats = getStatsFromProps(properties);
+        if (stats.valid()) {
+          return stats;
+        }
+
+        // estimate the stats from the InputSplits.
+        return getStatsEstimateFromInputSplits(getTableInputSplits());
+      } else {
+        final HiveStats aggStats = new HiveStats(0, 0);
+        for(Partition partition : hiveReadEntry.getPartitions()) {
+          final Properties properties = HiveUtilities.getPartitionMetadata(partition, table);
+          HiveStats stats = getStatsFromProps(properties);
+
+          if (!stats.valid()) {
+            // estimate the stats from InputSplits
+            stats = getStatsEstimateFromInputSplits(getPartitionInputSplits(partition));
+          }
+          aggStats.add(stats);
+        }
+
+        return aggStats;
+      }
+    } catch (final Exception e) {
+      throw new IOException("Failed to get numRows from HiveTable", e);
+    } finally {
+      logger.debug("Took {} µs to get stats from {}.{}", timeGetStats.elapsed(TimeUnit.NANOSECONDS) / 1000,
+          table.getDbName(), table.getTableName());
+    }
+  }
+
+  /** Helper method which return InputSplits for non-partitioned table */
+  private List<InputSplitWrapper> getTableInputSplits() throws Exception {
+    Preconditions.checkState(!isPartitionedTable, "Works only for non-partitioned tables");
+    if (tableInputSplits != null) {
+      return tableInputSplits;
+    }
+
+    final Properties properties = MetaStoreUtils.getTableMetadata(hiveReadEntry.getTable());
+    tableInputSplits = splitInputWithUGI(properties, hiveReadEntry.getTable().getSd(), null);
+
+    return tableInputSplits;
+  }
+
+  /** Helper method which returns the InputSplits for given partition. InputSplits are cached to speed up subsequent
+   * metadata cache requests for the same partition(s).
+   */
+  private List<InputSplitWrapper> getPartitionInputSplits(final Partition partition) throws Exception {
+    if (partitionInputSplitMap.containsKey(partition)) {
+      return partitionInputSplitMap.get(partition);
+    }
+
+    final Properties properties = HiveUtilities.getPartitionMetadata(partition, hiveReadEntry.getTable());
+    final List<InputSplitWrapper> splits = splitInputWithUGI(properties, partition.getSd(), partition);
+    partitionInputSplitMap.put(partition, splits);
+
+    return splits;
+  }
+
+  /**
+   * Return {@link InputSplitWrapper}s for given {@link HiveReadEntry}. First splits are looked up in cache, if not
+   * found go through {@link InputFormat#getSplits(JobConf, int)} to find the splits.
+   *
+   * @param hiveReadEntry Subset of the {@link HiveReadEntry} used when creating this object.
+   *
+   * @return
+   */
+  public List<InputSplitWrapper> getInputSplits(final HiveReadEntry hiveReadEntry) {
+    final Stopwatch timeGetSplits = new Stopwatch().start();
+    try {
+      if (!isPartitionedTable) {
+        return getTableInputSplits();
+      }
+
+      final List<InputSplitWrapper> splits = Lists.newArrayList();
+      for (Partition p : hiveReadEntry.getPartitions()) {
+        splits.addAll(getPartitionInputSplits(p));
+      }
+      return splits;
+    } catch (final Exception e) {
+      logger.error("Failed to get InputSplits", e);
+      throw new DrillRuntimeException("Failed to get InputSplits", e);
+    } finally {
+      logger.debug("Took {} µs to get InputSplits from {}.{}", timeGetSplits.elapsed(TimeUnit.NANOSECONDS) / 1000,
+          hiveReadEntry.getTable().getDbName(), hiveReadEntry.getTable().getTableName());
+    }
+  }
+
+  /**
+   * Get the list of directories which contain the input files. This list is useful for explain plan purposes.
+   *
+   * @param hiveReadEntry {@link HiveReadEntry} containing the input table and/or partitions.
+   */
+  protected List<String> getInputDirectories(final HiveReadEntry hiveReadEntry) {
+    if (isPartitionedTable) {
+      final List<String> inputs = Lists.newArrayList();
+      for(Partition p : hiveReadEntry.getPartitions()) {
+        inputs.add(p.getSd().getLocation());
+      }
+      return inputs;
+    }
+
+    return Collections.singletonList(hiveReadEntry.getTable().getSd().getLocation());
+  }
+
+  /**
+   * Get the stats from table properties. If not found -1 is returned for each stats field.
+   * CAUTION: stats may not be up-to-date with the underlying data. It is always good to run the ANALYZE command on
+   * Hive table to have up-to-date stats.
+   * @param properties
+   * @return
+   */
+  private HiveStats getStatsFromProps(final Properties properties) {
+    long numRows = -1;
+    long sizeInBytes = -1;
+    try {
+      final String numRowsProp = properties.getProperty(StatsSetupConst.ROW_COUNT);
+      if (numRowsProp != null) {
+          numRows = Long.valueOf(numRowsProp);
+      }
+
+      final String sizeInBytesProp = properties.getProperty(StatsSetupConst.TOTAL_SIZE);
+      if (sizeInBytesProp != null) {
+        sizeInBytes = Long.valueOf(numRowsProp);
+      }
+    } catch (final NumberFormatException e) {
+      logger.error("Failed to parse Hive stats in metastore.", e);
+      // continue with the defaults.
+    }
+
+    return new HiveStats(numRows, sizeInBytes);
+  }
+
+  /**
+   * Estimate the stats from the given list of InputSplits.
+   * @param inputSplits
+   * @return
+   * @throws IOException
+   */
+  private HiveStats getStatsEstimateFromInputSplits(final List<InputSplitWrapper> inputSplits) throws IOException {
+    long data = 0;
+    for (final InputSplitWrapper split : inputSplits) {
+      data += split.getSplit().getLength();
+    }
+
+    return new HiveStats(data/RECORD_SIZE, data);
+  }
+
+  private List<InputSplitWrapper> splitInputWithUGI(final Properties properties, final StorageDescriptor sd,
+      final Partition partition) throws Exception {
+    watch.start();
+    try {
+      return ugi.doAs(new PrivilegedExceptionAction<List<InputSplitWrapper>>() {
+        public List<InputSplitWrapper> run() throws Exception {
+          final List<InputSplitWrapper> splits = Lists.newArrayList();
+          final JobConf job = new JobConf();
+          HiveUtilities.addConfToJob(job, properties, hiveReadEntry.hiveConfigOverride);
+          HiveUtilities.setInputFormatClass(job, sd, hiveReadEntry.getTable());
+          final Path path = new Path(sd.getLocation());
+          final FileSystem fs = path.getFileSystem(job);
+
+          if (fs.exists(path)) {
+            FileInputFormat.addInputPath(job, path);
+            final InputFormat format = job.getInputFormat();
+            for (final InputSplit split : format.getSplits(job, 1)) {
+              splits.add(new InputSplitWrapper(split, partition));
+            }
+          }
+
+          return splits;
+        }
+      });
+    } catch (final InterruptedException | IOException e) {
+      final String errMsg = String.format("Failed to create input splits: %s", e.getMessage());
+      logger.error(errMsg, e);
+      throw new DrillRuntimeException(errMsg, e);
+    } finally {
+      logger.trace("Took {} µs to get splits from {}", watch.elapsed(TimeUnit.NANOSECONDS) / 1000, sd.getLocation());
+      watch.stop();
+    }
+  }
+
+  /** Contains InputSplit along with the Partition. If non-partitioned tables, the partition field is null. */
+  public static class InputSplitWrapper {
+    private InputSplit split;
+    private Partition partition;
+
+    public InputSplitWrapper(final InputSplit split, final Partition partition) {
+      this.split = split;
+      this.partition = partition;
+    }
+
+    public InputSplit getSplit() {
+      return split;
+    }
+
+    public Partition getPartition() {
+      return partition;
+    }
+  }
+
+  /** Contains stats. Currently only numRows and totalSizeInBytes are used. */
+  public static class HiveStats {
+    private long numRows;
+    private long sizeInBytes;
+
+    public HiveStats(final long numRows, final long sizeInBytes) {
+      this.numRows = numRows;
+      this.sizeInBytes = sizeInBytes;
+    }
+
+    public long getNumRows() {
+      return numRows;
+    }
+
+    public long getSizeInBytes() {
+      return sizeInBytes;
+    }
+
+    /** Both numRows and sizeInBytes are expected to be greater than 0 for stats to be valid */
+    public boolean valid() {
+      return numRows > 0 && sizeInBytes > 0;
+    }
+
+    public void add(HiveStats s) {
+      numRows += s.numRows;
+      sizeInBytes += s.sizeInBytes;
+    }
+
+    @Override
+    public String toString() {
+      return "numRows: " + numRows + ", sizeInBytes: " + sizeInBytes;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bc74629a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index cd7d6e5..20c4e69 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -18,13 +18,10 @@
 package org.apache.drill.exec.store.hive;
 
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -41,19 +38,13 @@ import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.hive.HiveMetadataProvider.HiveStats;
+import org.apache.drill.exec.store.hive.HiveMetadataProvider.InputSplitWrapper;
 import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -64,7 +55,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
-import org.apache.hadoop.security.UserGroupInformation;
 
 @JsonTypeName("hive-scan")
 public class HiveScan extends AbstractGroupScan {
@@ -74,155 +64,76 @@ public class HiveScan extends AbstractGroupScan {
 
   @JsonProperty("hive-table")
   public HiveReadEntry hiveReadEntry;
-  @JsonIgnore
-  protected List<InputSplit> inputSplits = Lists.newArrayList();
-  @JsonIgnore
-  public HiveStoragePlugin storagePlugin;
-  @JsonProperty("storage-plugin")
-  public String storagePluginName;
 
   @JsonIgnore
-  private final Collection<DrillbitEndpoint> endpoints;
+  public HiveStoragePlugin storagePlugin;
 
   @JsonProperty("columns")
   public List<SchemaPath> columns;
 
-  @JsonProperty("numPartitions")
-  public final int numPartitions;
-
   @JsonIgnore
-  List<List<InputSplit>> mappings;
+  protected final HiveMetadataProvider metadataProvider;
 
   @JsonIgnore
-  Map<InputSplit, Partition> partitionMap = new HashMap();
+  private List<List<InputSplitWrapper>> mappings;
 
-  /*
-   * total number of rows (obtained from metadata store)
-   */
   @JsonIgnore
-  private long rowCount = 0;
+  protected List<InputSplitWrapper> inputSplits;
 
   @JsonCreator
   public HiveScan(@JsonProperty("userName") final String userName,
                   @JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
                   @JsonProperty("storage-plugin") final String storagePluginName,
                   @JsonProperty("columns") final List<SchemaPath> columns,
-                  @JsonProperty("numPartitions") final int numPartitions,
                   @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
-    super(userName);
-    this.hiveReadEntry = hiveReadEntry;
-    this.storagePluginName = storagePluginName;
-    this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName);
-    this.columns = columns;
-    getSplitsWithUGI();
-    endpoints = storagePlugin.getContext().getBits();
-    this.numPartitions = numPartitions;
+    this(userName, hiveReadEntry, (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName), columns, null);
   }
 
-  public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
+  public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin,
+      final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider) throws ExecutionSetupException {
     super(userName);
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
     this.storagePlugin = storagePlugin;
-    getSplitsWithUGI();
-    endpoints = storagePlugin.getContext().getBits();
-    this.storagePluginName = storagePlugin.getName();
-    List<HivePartition> partitions = hiveReadEntry.getHivePartitionWrappers();
-    numPartitions = partitions == null ? 0 : partitions.size();
+    if (metadataProvider == null) {
+      this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry);
+    } else {
+      this.metadataProvider = metadataProvider;
+    }
   }
 
   public HiveScan(final HiveScan that) {
     super(that);
     this.columns = that.columns;
-    this.endpoints = that.endpoints;
     this.hiveReadEntry = that.hiveReadEntry;
-    this.inputSplits = that.inputSplits;
-    this.mappings = that.mappings;
-    this.partitionMap = that.partitionMap;
     this.storagePlugin = that.storagePlugin;
-    this.storagePluginName = that.storagePluginName;
-    this.rowCount = that.rowCount;
-    this.numPartitions = that.numPartitions;
+    this.metadataProvider = that.metadataProvider;
   }
 
   public HiveScan clone(final HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
-    return new HiveScan(getUserName(), hiveReadEntry, storagePlugin, columns);
+    return new HiveScan(getUserName(), hiveReadEntry, storagePlugin, columns, metadataProvider);
   }
 
   public List<SchemaPath> getColumns() {
     return columns;
   }
 
-  private void getSplitsWithUGI() throws ExecutionSetupException {
-    final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(getUserName());
-    try {
-      ugi.doAs(new PrivilegedExceptionAction<Void>() {
-        public Void run() throws Exception {
-          getSplits();
-          return null;
-        }
-      });
-    } catch (final InterruptedException | IOException e) {
-      final String errMsg = String.format("Failed to create input splits: %s", e.getMessage());
-      logger.error(errMsg, e);
-      throw new DrillRuntimeException(errMsg, e);
+  protected List<InputSplitWrapper> getInputSplits() {
+    if (inputSplits == null) {
+      inputSplits = metadataProvider.getInputSplits(hiveReadEntry);
     }
-  }
 
-  private void getSplits() throws ExecutionSetupException {
-    try {
-      final List<Partition> partitions = hiveReadEntry.getPartitions();
-      final Table table = hiveReadEntry.getTable();
-      if (partitions == null || partitions.size() == 0) {
-        final Properties properties = MetaStoreUtils.getTableMetadata(table);
-        splitInput(properties, table.getSd(), null);
-      } else {
-        for (final Partition partition : partitions) {
-          final Properties properties = HiveUtilities.getPartitionMetadata(partition, table);
-          splitInput(properties, partition.getSd(), partition);
-        }
-      }
-    } catch (final Exception e) {
-      throw new ExecutionSetupException(e);
-    }
-  }
-
-  /* Split the input given in StorageDescriptor */
-  private void splitInput(final Properties properties, final StorageDescriptor sd, final Partition partition)
-      throws Exception {
-    final JobConf job = new JobConf();
-    HiveUtilities.addConfToJob(job, properties, hiveReadEntry.hiveConfigOverride);
-    HiveUtilities.setInputFormatClass(job, sd, hiveReadEntry.getTable());
-    final Path path = new Path(sd.getLocation());
-    final FileSystem fs = path.getFileSystem(job);
-
-    if (fs.exists(path)) {
-      FileInputFormat.addInputPath(job, path);
-      final InputFormat format = job.getInputFormat();
-      for (final InputSplit split : format.getSplits(job, 1)) {
-        inputSplits.add(split);
-        partitionMap.put(split, partition);
-      }
-    }
-    final String numRowsProp = properties.getProperty("numRows");
-    logger.trace("HiveScan num rows property = {}", numRowsProp);
-    if (numRowsProp != null) {
-      final long numRows = Long.valueOf(numRowsProp);
-      // starting from hive-0.13, when no statistics are available, this property is set to -1
-      // it's important to note that the value returned by hive may not be up to date
-      if (numRows > 0) {
-        rowCount += numRows;
-      }
-    }
+    return inputSplits;
   }
 
   @Override
   public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) {
     mappings = Lists.newArrayList();
     for (int i = 0; i < endpoints.size(); i++) {
-      mappings.add(new ArrayList<InputSplit>());
+      mappings.add(new ArrayList<InputSplitWrapper>());
     }
     final int count = endpoints.size();
+    final List<InputSplitWrapper> inputSplits = getInputSplits();
     for (int i = 0; i < inputSplits.size(); i++) {
       mappings.get(i % count).add(inputSplits.get(i));
     }
@@ -239,20 +150,19 @@ public class HiveScan extends AbstractGroupScan {
   @Override
   public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
     try {
-      final List<InputSplit> splits = mappings.get(minorFragmentId);
+      final List<InputSplitWrapper> splits = mappings.get(minorFragmentId);
       List<HivePartition> parts = Lists.newArrayList();
       final List<String> encodedInputSplits = Lists.newArrayList();
       final List<String> splitTypes = Lists.newArrayList();
-      for (final InputSplit split : splits) {
-        HivePartition partition = null;
-        if (partitionMap.get(split) != null) {
-          partition = new HivePartition(partitionMap.get(split));
+      for (final InputSplitWrapper split : splits) {
+        if (split.getPartition() != null) {
+          parts.add(new HivePartition(split.getPartition()));
         }
-        parts.add(partition);
-        encodedInputSplits.add(serializeInputSplit(split));
-        splitTypes.add(split.getClass().getName());
+
+        encodedInputSplits.add(serializeInputSplit(split.getSplit()));
+        splitTypes.add(split.getSplit().getClass().getName());
       }
-      if (parts.contains(null)) {
+      if (parts.size() <= 0) {
         parts = null;
       }
 
@@ -265,25 +175,26 @@ public class HiveScan extends AbstractGroupScan {
 
   @Override
   public int getMaxParallelizationWidth() {
-    return inputSplits.size();
+    return getInputSplits().size();
   }
 
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
     final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
-    for (final DrillbitEndpoint endpoint : endpoints) {
+    for (final DrillbitEndpoint endpoint : storagePlugin.getContext().getBits()) {
       endpointMap.put(endpoint.getAddress(), endpoint);
       logger.debug("endpoing address: {}", endpoint.getAddress());
     }
     final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
     try {
       long totalSize = 0;
-      for (final InputSplit split : inputSplits) {
-        totalSize += Math.max(1, split.getLength());
+      final List<InputSplitWrapper> inputSplits = getInputSplits();
+      for (final InputSplitWrapper split : inputSplits) {
+        totalSize += Math.max(1, split.getSplit().getLength());
       }
-      for (final InputSplit split : inputSplits) {
-        final float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
-        for (final String loc : split.getLocations()) {
+      for (final InputSplitWrapper split : inputSplits) {
+        final float affinity = ((float) Math.max(1, split.getSplit().getLength())) / totalSize;
+        for (final String loc : split.getSplit().getLocations()) {
           logger.debug("split location: {}", loc);
           final DrillbitEndpoint endpoint = endpointMap.get(loc);
           if (endpoint != null) {
@@ -310,23 +221,14 @@ public class HiveScan extends AbstractGroupScan {
   @Override
   public ScanStats getScanStats() {
     try {
-      long data =0;
-      for (final InputSplit split : inputSplits) {
-          data += split.getLength();
-      }
+      final HiveStats stats = metadataProvider.getStats(hiveReadEntry);
 
-      long estRowCount = rowCount;
-      if (estRowCount == 0) {
-        // having a rowCount of 0 can mean the statistics were never computed
-        estRowCount = data/1024;
-      }
+      logger.debug("HiveStats: {}", stats.toString());
 
       // Hive's native reader is neither memory efficient nor fast. Increase the CPU cost
       // by a factor to let the planner choose HiveDrillNativeScan over HiveScan with SerDes.
       float cpuCost = 1 * getSerDeOverheadFactor();
-
-      logger.debug("estimated row count = {}, stats row count = {}", estRowCount, rowCount);
-      return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, cpuCost, data);
+      return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, stats.getNumRows(), cpuCost, stats.getSizeInBytes());
     } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
@@ -359,10 +261,11 @@ public class HiveScan extends AbstractGroupScan {
     List<HivePartition> partitions = hiveReadEntry.getHivePartitionWrappers();
     int numPartitions = partitions == null ? 0 : partitions.size();
     return "HiveScan [table=" + hiveReadEntry.getHiveTableWrapper()
-        + ", inputSplits=" + inputSplits
         + ", columns=" + columns
         + ", numPartitions=" + numPartitions
-        + ", partitions= " + partitions +"]";
+        + ", partitions= " + partitions
+        + ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry)
+        + "]";
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bc74629a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index c1b6a0c..c967c00 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -79,7 +79,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
             "Querying views created in Hive from Drill is not supported in current version.");
       }
 
-      return new HiveScan(userName, hiveReadEntry, this, columns);
+      return new HiveScan(userName, hiveReadEntry, this, columns, null);
     } catch (ExecutionSetupException e) {
       throw new IOException(e);
     }