You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/12/15 18:37:45 UTC
drill git commit: DRILL-4194: Improve performance of the HiveScan
metadata fetch operation
Repository: drill
Updated Branches:
refs/heads/master 2953fe587 -> bc74629a5
DRILL-4194: Improve performance of the HiveScan metadata fetch operation
+ Use the stats (numRows) stored in Hive metastore whenever available to
calculate the costs for planning purpose
+ Delay the costly operation of loading of InputSplits until needed. When
InputSplits are loaded, cache them at query level to speedup subsequent
access.
this closes #301
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bc74629a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bc74629a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bc74629a
Branch: refs/heads/master
Commit: bc74629a546afe109252dcf4e3ef00ffc22e7a7a
Parents: 2953fe5
Author: vkorukanti <ve...@gmail.com>
Authored: Fri Dec 11 11:36:11 2015 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Dec 15 02:26:43 2015 -0800
----------------------------------------------------------------------
...onvertHiveParquetScanToDrillParquetScan.java | 3 +-
.../store/hive/HiveDrillNativeParquetScan.java | 16 +-
.../exec/store/hive/HiveMetadataProvider.java | 320 +++++++++++++++++++
.../apache/drill/exec/store/hive/HiveScan.java | 187 +++--------
.../exec/store/hive/HiveStoragePlugin.java | 2 +-
5 files changed, 377 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/bc74629a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index 14e4a6f..7f42336 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -222,7 +222,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
hiveScan.getUserName(),
hiveScan.hiveReadEntry,
hiveScan.storagePlugin,
- nativeScanCols);
+ nativeScanCols,
+ null);
return new DrillScanRel(
hiveScanRel.getCluster(),
http://git-wip-us.apache.org/repos/asf/drill/blob/bc74629a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 41d06a9..17cae22 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
import java.io.IOException;
import java.util.List;
@@ -43,14 +44,13 @@ public class HiveDrillNativeParquetScan extends HiveScan {
@JsonProperty("hive-table") HiveReadEntry hiveReadEntry,
@JsonProperty("storage-plugin") String storagePluginName,
@JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("numPartitions") final int numPartitions,
@JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
- super(userName, hiveReadEntry, storagePluginName, columns, numPartitions, pluginRegistry);
+ super(userName, hiveReadEntry, storagePluginName, columns, pluginRegistry);
}
public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin,
- List<SchemaPath> columns) throws ExecutionSetupException {
- super(userName, hiveReadEntry, storagePlugin, columns);
+ List<SchemaPath> columns, HiveMetadataProvider metadataProvider) throws ExecutionSetupException {
+ super(userName, hiveReadEntry, storagePlugin, columns, metadataProvider);
}
public HiveDrillNativeParquetScan(final HiveScan hiveScan) {
@@ -91,7 +91,7 @@ public class HiveDrillNativeParquetScan extends HiveScan {
@Override
public HiveScan clone(HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
- return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, storagePlugin, columns);
+ return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, storagePlugin, columns, metadataProvider);
}
@Override
@@ -103,10 +103,12 @@ public class HiveDrillNativeParquetScan extends HiveScan {
@Override
public String toString() {
+ final List<HivePartition> partitions = hiveReadEntry.getHivePartitionWrappers();
+ int numPartitions = partitions == null ? 0 : partitions.size();
return "HiveDrillNativeParquetScan [table=" + hiveReadEntry.getHiveTableWrapper()
- + ", inputSplits=" + inputSplits
+ ", columns=" + columns
+ ", numPartitions=" + numPartitions
- + ", partitions= " + hiveReadEntry.getHivePartitionWrappers() +"]";
+ + ", partitions= " + partitions
+ + ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry) + "]";
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bc74629a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
new file mode 100644
index 0000000..7b36796
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class which provides methods to get metadata of given Hive table selection. It tries to use the stats stored in
+ * MetaStore whenever available and delays the costly operation of loading of InputSplits until needed. When
+ * loaded, InputSplits are cached to speedup subsequent access.
+ */
+public class HiveMetadataProvider {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveMetadataProvider.class);
+
+ public static final int RECORD_SIZE = 1024;
+
+ private final HiveReadEntry hiveReadEntry;
+ private final UserGroupInformation ugi;
+ private final boolean isPartitionedTable;
+ private final Map<Partition, List<InputSplitWrapper>> partitionInputSplitMap;
+ private List<InputSplitWrapper> tableInputSplits;
+
+ private final Stopwatch watch = new Stopwatch();
+
+ public HiveMetadataProvider(final String userName, final HiveReadEntry hiveReadEntry) {
+ this.hiveReadEntry = hiveReadEntry;
+ this.ugi = ImpersonationUtil.createProxyUgi(userName);
+ isPartitionedTable = hiveReadEntry.getTable().getPartitionKeysSize() > 0;
+ partitionInputSplitMap = Maps.newHashMap();
+ }
+
+ /**
+ * Return stats for table/partitions in given {@link HiveReadEntry}. If valid stats are available in MetaStore,
+ * return it. Otherwise estimate using the size of the input data.
+ *
+ * @param hiveReadEntry Subset of the {@link HiveReadEntry} used when creating this cache object.
+ * @return
+ * @throws IOException
+ */
+ public HiveStats getStats(final HiveReadEntry hiveReadEntry) throws IOException {
+ final Stopwatch timeGetStats = new Stopwatch().start();
+
+ final Table table = hiveReadEntry.getTable();
+ try {
+ if (!isPartitionedTable) {
+ final Properties properties = MetaStoreUtils.getTableMetadata(table);
+ final HiveStats stats = getStatsFromProps(properties);
+ if (stats.valid()) {
+ return stats;
+ }
+
+ // estimate the stats from the InputSplits.
+ return getStatsEstimateFromInputSplits(getTableInputSplits());
+ } else {
+ final HiveStats aggStats = new HiveStats(0, 0);
+ for(Partition partition : hiveReadEntry.getPartitions()) {
+ final Properties properties = HiveUtilities.getPartitionMetadata(partition, table);
+ HiveStats stats = getStatsFromProps(properties);
+
+ if (!stats.valid()) {
+ // estimate the stats from InputSplits
+ stats = getStatsEstimateFromInputSplits(getPartitionInputSplits(partition));
+ }
+ aggStats.add(stats);
+ }
+
+ return aggStats;
+ }
+ } catch (final Exception e) {
+ throw new IOException("Failed to get numRows from HiveTable", e);
+ } finally {
+ logger.debug("Took {} µs to get stats from {}.{}", timeGetStats.elapsed(TimeUnit.NANOSECONDS) / 1000,
+ table.getDbName(), table.getTableName());
+ }
+ }
+
+ /** Helper method which return InputSplits for non-partitioned table */
+ private List<InputSplitWrapper> getTableInputSplits() throws Exception {
+ Preconditions.checkState(!isPartitionedTable, "Works only for non-partitioned tables");
+ if (tableInputSplits != null) {
+ return tableInputSplits;
+ }
+
+ final Properties properties = MetaStoreUtils.getTableMetadata(hiveReadEntry.getTable());
+ tableInputSplits = splitInputWithUGI(properties, hiveReadEntry.getTable().getSd(), null);
+
+ return tableInputSplits;
+ }
+
+ /** Helper method which returns the InputSplits for given partition. InputSplits are cached to speed up subsequent
+ * metadata cache requests for the same partition(s).
+ */
+ private List<InputSplitWrapper> getPartitionInputSplits(final Partition partition) throws Exception {
+ if (partitionInputSplitMap.containsKey(partition)) {
+ return partitionInputSplitMap.get(partition);
+ }
+
+ final Properties properties = HiveUtilities.getPartitionMetadata(partition, hiveReadEntry.getTable());
+ final List<InputSplitWrapper> splits = splitInputWithUGI(properties, partition.getSd(), partition);
+ partitionInputSplitMap.put(partition, splits);
+
+ return splits;
+ }
+
+ /**
+ * Return {@link InputSplitWrapper}s for given {@link HiveReadEntry}. First splits are looked up in cache, if not
+ * found go through {@link InputFormat#getSplits(JobConf, int)} to find the splits.
+ *
+ * @param hiveReadEntry Subset of the {@link HiveReadEntry} used when creating this object.
+ *
+ * @return
+ */
+ public List<InputSplitWrapper> getInputSplits(final HiveReadEntry hiveReadEntry) {
+ final Stopwatch timeGetSplits = new Stopwatch().start();
+ try {
+ if (!isPartitionedTable) {
+ return getTableInputSplits();
+ }
+
+ final List<InputSplitWrapper> splits = Lists.newArrayList();
+ for (Partition p : hiveReadEntry.getPartitions()) {
+ splits.addAll(getPartitionInputSplits(p));
+ }
+ return splits;
+ } catch (final Exception e) {
+ logger.error("Failed to get InputSplits", e);
+ throw new DrillRuntimeException("Failed to get InputSplits", e);
+ } finally {
+ logger.debug("Took {} µs to get InputSplits from {}.{}", timeGetSplits.elapsed(TimeUnit.NANOSECONDS) / 1000,
+ hiveReadEntry.getTable().getDbName(), hiveReadEntry.getTable().getTableName());
+ }
+ }
+
+ /**
+ * Get the list of directories which contain the input files. This list is useful for explain plan purposes.
+ *
+ * @param hiveReadEntry {@link HiveReadEntry} containing the input table and/or partitions.
+ */
+ protected List<String> getInputDirectories(final HiveReadEntry hiveReadEntry) {
+ if (isPartitionedTable) {
+ final List<String> inputs = Lists.newArrayList();
+ for(Partition p : hiveReadEntry.getPartitions()) {
+ inputs.add(p.getSd().getLocation());
+ }
+ return inputs;
+ }
+
+ return Collections.singletonList(hiveReadEntry.getTable().getSd().getLocation());
+ }
+
+ /**
+ * Get the stats from table properties. If not found -1 is returned for each stats field.
+ * CAUTION: stats may not be up-to-date with the underlying data. It is always good to run the ANALYZE command on
+ * Hive table to have up-to-date stats.
+ * @param properties
+ * @return
+ */
+ private HiveStats getStatsFromProps(final Properties properties) {
+ long numRows = -1;
+ long sizeInBytes = -1;
+ try {
+ final String numRowsProp = properties.getProperty(StatsSetupConst.ROW_COUNT);
+ if (numRowsProp != null) {
+ numRows = Long.valueOf(numRowsProp);
+ }
+
+ final String sizeInBytesProp = properties.getProperty(StatsSetupConst.TOTAL_SIZE);
+ if (sizeInBytesProp != null) {
+ sizeInBytes = Long.valueOf(numRowsProp);
+ }
+ } catch (final NumberFormatException e) {
+ logger.error("Failed to parse Hive stats in metastore.", e);
+ // continue with the defaults.
+ }
+
+ return new HiveStats(numRows, sizeInBytes);
+ }
+
+ /**
+ * Estimate the stats from the given list of InputSplits.
+ * @param inputSplits
+ * @return
+ * @throws IOException
+ */
+ private HiveStats getStatsEstimateFromInputSplits(final List<InputSplitWrapper> inputSplits) throws IOException {
+ long data = 0;
+ for (final InputSplitWrapper split : inputSplits) {
+ data += split.getSplit().getLength();
+ }
+
+ return new HiveStats(data/RECORD_SIZE, data);
+ }
+
+ private List<InputSplitWrapper> splitInputWithUGI(final Properties properties, final StorageDescriptor sd,
+ final Partition partition) throws Exception {
+ watch.start();
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<List<InputSplitWrapper>>() {
+ public List<InputSplitWrapper> run() throws Exception {
+ final List<InputSplitWrapper> splits = Lists.newArrayList();
+ final JobConf job = new JobConf();
+ HiveUtilities.addConfToJob(job, properties, hiveReadEntry.hiveConfigOverride);
+ HiveUtilities.setInputFormatClass(job, sd, hiveReadEntry.getTable());
+ final Path path = new Path(sd.getLocation());
+ final FileSystem fs = path.getFileSystem(job);
+
+ if (fs.exists(path)) {
+ FileInputFormat.addInputPath(job, path);
+ final InputFormat format = job.getInputFormat();
+ for (final InputSplit split : format.getSplits(job, 1)) {
+ splits.add(new InputSplitWrapper(split, partition));
+ }
+ }
+
+ return splits;
+ }
+ });
+ } catch (final InterruptedException | IOException e) {
+ final String errMsg = String.format("Failed to create input splits: %s", e.getMessage());
+ logger.error(errMsg, e);
+ throw new DrillRuntimeException(errMsg, e);
+ } finally {
+ logger.trace("Took {} µs to get splits from {}", watch.elapsed(TimeUnit.NANOSECONDS) / 1000, sd.getLocation());
+ watch.stop();
+ }
+ }
+
+ /** Contains InputSplit along with the Partition. If non-partitioned tables, the partition field is null. */
+ public static class InputSplitWrapper {
+ private InputSplit split;
+ private Partition partition;
+
+ public InputSplitWrapper(final InputSplit split, final Partition partition) {
+ this.split = split;
+ this.partition = partition;
+ }
+
+ public InputSplit getSplit() {
+ return split;
+ }
+
+ public Partition getPartition() {
+ return partition;
+ }
+ }
+
+ /** Contains stats. Currently only numRows and totalSizeInBytes are used. */
+ public static class HiveStats {
+ private long numRows;
+ private long sizeInBytes;
+
+ public HiveStats(final long numRows, final long sizeInBytes) {
+ this.numRows = numRows;
+ this.sizeInBytes = sizeInBytes;
+ }
+
+ public long getNumRows() {
+ return numRows;
+ }
+
+ public long getSizeInBytes() {
+ return sizeInBytes;
+ }
+
+ /** Both numRows and sizeInBytes are expected to be greater than 0 for stats to be valid */
+ public boolean valid() {
+ return numRows > 0 && sizeInBytes > 0;
+ }
+
+ public void add(HiveStats s) {
+ numRows += s.numRows;
+ sizeInBytes += s.sizeInBytes;
+ }
+
+ @Override
+ public String toString() {
+ return "numRows: " + numRows + ", sizeInBytes: " + sizeInBytes;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/bc74629a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index cd7d6e5..20c4e69 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -18,13 +18,10 @@
package org.apache.drill.exec.store.hive;
import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import org.apache.commons.codec.binary.Base64;
import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -41,19 +38,13 @@ import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.hive.HiveMetadataProvider.HiveStats;
+import org.apache.drill.exec.store.hive.HiveMetadataProvider.InputSplitWrapper;
import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -64,7 +55,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
-import org.apache.hadoop.security.UserGroupInformation;
@JsonTypeName("hive-scan")
public class HiveScan extends AbstractGroupScan {
@@ -74,155 +64,76 @@ public class HiveScan extends AbstractGroupScan {
@JsonProperty("hive-table")
public HiveReadEntry hiveReadEntry;
- @JsonIgnore
- protected List<InputSplit> inputSplits = Lists.newArrayList();
- @JsonIgnore
- public HiveStoragePlugin storagePlugin;
- @JsonProperty("storage-plugin")
- public String storagePluginName;
@JsonIgnore
- private final Collection<DrillbitEndpoint> endpoints;
+ public HiveStoragePlugin storagePlugin;
@JsonProperty("columns")
public List<SchemaPath> columns;
- @JsonProperty("numPartitions")
- public final int numPartitions;
-
@JsonIgnore
- List<List<InputSplit>> mappings;
+ protected final HiveMetadataProvider metadataProvider;
@JsonIgnore
- Map<InputSplit, Partition> partitionMap = new HashMap();
+ private List<List<InputSplitWrapper>> mappings;
- /*
- * total number of rows (obtained from metadata store)
- */
@JsonIgnore
- private long rowCount = 0;
+ protected List<InputSplitWrapper> inputSplits;
@JsonCreator
public HiveScan(@JsonProperty("userName") final String userName,
@JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
@JsonProperty("storage-plugin") final String storagePluginName,
@JsonProperty("columns") final List<SchemaPath> columns,
- @JsonProperty("numPartitions") final int numPartitions,
@JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
- super(userName);
- this.hiveReadEntry = hiveReadEntry;
- this.storagePluginName = storagePluginName;
- this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName);
- this.columns = columns;
- getSplitsWithUGI();
- endpoints = storagePlugin.getContext().getBits();
- this.numPartitions = numPartitions;
+ this(userName, hiveReadEntry, (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName), columns, null);
}
- public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
+ public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin,
+ final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider) throws ExecutionSetupException {
super(userName);
this.hiveReadEntry = hiveReadEntry;
this.columns = columns;
this.storagePlugin = storagePlugin;
- getSplitsWithUGI();
- endpoints = storagePlugin.getContext().getBits();
- this.storagePluginName = storagePlugin.getName();
- List<HivePartition> partitions = hiveReadEntry.getHivePartitionWrappers();
- numPartitions = partitions == null ? 0 : partitions.size();
+ if (metadataProvider == null) {
+ this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry);
+ } else {
+ this.metadataProvider = metadataProvider;
+ }
}
public HiveScan(final HiveScan that) {
super(that);
this.columns = that.columns;
- this.endpoints = that.endpoints;
this.hiveReadEntry = that.hiveReadEntry;
- this.inputSplits = that.inputSplits;
- this.mappings = that.mappings;
- this.partitionMap = that.partitionMap;
this.storagePlugin = that.storagePlugin;
- this.storagePluginName = that.storagePluginName;
- this.rowCount = that.rowCount;
- this.numPartitions = that.numPartitions;
+ this.metadataProvider = that.metadataProvider;
}
public HiveScan clone(final HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
- return new HiveScan(getUserName(), hiveReadEntry, storagePlugin, columns);
+ return new HiveScan(getUserName(), hiveReadEntry, storagePlugin, columns, metadataProvider);
}
public List<SchemaPath> getColumns() {
return columns;
}
- private void getSplitsWithUGI() throws ExecutionSetupException {
- final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(getUserName());
- try {
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws Exception {
- getSplits();
- return null;
- }
- });
- } catch (final InterruptedException | IOException e) {
- final String errMsg = String.format("Failed to create input splits: %s", e.getMessage());
- logger.error(errMsg, e);
- throw new DrillRuntimeException(errMsg, e);
+ protected List<InputSplitWrapper> getInputSplits() {
+ if (inputSplits == null) {
+ inputSplits = metadataProvider.getInputSplits(hiveReadEntry);
}
- }
- private void getSplits() throws ExecutionSetupException {
- try {
- final List<Partition> partitions = hiveReadEntry.getPartitions();
- final Table table = hiveReadEntry.getTable();
- if (partitions == null || partitions.size() == 0) {
- final Properties properties = MetaStoreUtils.getTableMetadata(table);
- splitInput(properties, table.getSd(), null);
- } else {
- for (final Partition partition : partitions) {
- final Properties properties = HiveUtilities.getPartitionMetadata(partition, table);
- splitInput(properties, partition.getSd(), partition);
- }
- }
- } catch (final Exception e) {
- throw new ExecutionSetupException(e);
- }
- }
-
- /* Split the input given in StorageDescriptor */
- private void splitInput(final Properties properties, final StorageDescriptor sd, final Partition partition)
- throws Exception {
- final JobConf job = new JobConf();
- HiveUtilities.addConfToJob(job, properties, hiveReadEntry.hiveConfigOverride);
- HiveUtilities.setInputFormatClass(job, sd, hiveReadEntry.getTable());
- final Path path = new Path(sd.getLocation());
- final FileSystem fs = path.getFileSystem(job);
-
- if (fs.exists(path)) {
- FileInputFormat.addInputPath(job, path);
- final InputFormat format = job.getInputFormat();
- for (final InputSplit split : format.getSplits(job, 1)) {
- inputSplits.add(split);
- partitionMap.put(split, partition);
- }
- }
- final String numRowsProp = properties.getProperty("numRows");
- logger.trace("HiveScan num rows property = {}", numRowsProp);
- if (numRowsProp != null) {
- final long numRows = Long.valueOf(numRowsProp);
- // starting from hive-0.13, when no statistics are available, this property is set to -1
- // it's important to note that the value returned by hive may not be up to date
- if (numRows > 0) {
- rowCount += numRows;
- }
- }
+ return inputSplits;
}
@Override
public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) {
mappings = Lists.newArrayList();
for (int i = 0; i < endpoints.size(); i++) {
- mappings.add(new ArrayList<InputSplit>());
+ mappings.add(new ArrayList<InputSplitWrapper>());
}
final int count = endpoints.size();
+ final List<InputSplitWrapper> inputSplits = getInputSplits();
for (int i = 0; i < inputSplits.size(); i++) {
mappings.get(i % count).add(inputSplits.get(i));
}
@@ -239,20 +150,19 @@ public class HiveScan extends AbstractGroupScan {
@Override
public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
try {
- final List<InputSplit> splits = mappings.get(minorFragmentId);
+ final List<InputSplitWrapper> splits = mappings.get(minorFragmentId);
List<HivePartition> parts = Lists.newArrayList();
final List<String> encodedInputSplits = Lists.newArrayList();
final List<String> splitTypes = Lists.newArrayList();
- for (final InputSplit split : splits) {
- HivePartition partition = null;
- if (partitionMap.get(split) != null) {
- partition = new HivePartition(partitionMap.get(split));
+ for (final InputSplitWrapper split : splits) {
+ if (split.getPartition() != null) {
+ parts.add(new HivePartition(split.getPartition()));
}
- parts.add(partition);
- encodedInputSplits.add(serializeInputSplit(split));
- splitTypes.add(split.getClass().getName());
+
+ encodedInputSplits.add(serializeInputSplit(split.getSplit()));
+ splitTypes.add(split.getSplit().getClass().getName());
}
- if (parts.contains(null)) {
+ if (parts.size() <= 0) {
parts = null;
}
@@ -265,25 +175,26 @@ public class HiveScan extends AbstractGroupScan {
@Override
public int getMaxParallelizationWidth() {
- return inputSplits.size();
+ return getInputSplits().size();
}
@Override
public List<EndpointAffinity> getOperatorAffinity() {
final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
- for (final DrillbitEndpoint endpoint : endpoints) {
+ for (final DrillbitEndpoint endpoint : storagePlugin.getContext().getBits()) {
endpointMap.put(endpoint.getAddress(), endpoint);
logger.debug("endpoing address: {}", endpoint.getAddress());
}
final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
try {
long totalSize = 0;
- for (final InputSplit split : inputSplits) {
- totalSize += Math.max(1, split.getLength());
+ final List<InputSplitWrapper> inputSplits = getInputSplits();
+ for (final InputSplitWrapper split : inputSplits) {
+ totalSize += Math.max(1, split.getSplit().getLength());
}
- for (final InputSplit split : inputSplits) {
- final float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
- for (final String loc : split.getLocations()) {
+ for (final InputSplitWrapper split : inputSplits) {
+ final float affinity = ((float) Math.max(1, split.getSplit().getLength())) / totalSize;
+ for (final String loc : split.getSplit().getLocations()) {
logger.debug("split location: {}", loc);
final DrillbitEndpoint endpoint = endpointMap.get(loc);
if (endpoint != null) {
@@ -310,23 +221,14 @@ public class HiveScan extends AbstractGroupScan {
@Override
public ScanStats getScanStats() {
try {
- long data =0;
- for (final InputSplit split : inputSplits) {
- data += split.getLength();
- }
+ final HiveStats stats = metadataProvider.getStats(hiveReadEntry);
- long estRowCount = rowCount;
- if (estRowCount == 0) {
- // having a rowCount of 0 can mean the statistics were never computed
- estRowCount = data/1024;
- }
+ logger.debug("HiveStats: {}", stats.toString());
// Hive's native reader is neither memory efficient nor fast. Increase the CPU cost
// by a factor to let the planner choose HiveDrillNativeScan over HiveScan with SerDes.
float cpuCost = 1 * getSerDeOverheadFactor();
-
- logger.debug("estimated row count = {}, stats row count = {}", estRowCount, rowCount);
- return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, cpuCost, data);
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, stats.getNumRows(), cpuCost, stats.getSizeInBytes());
} catch (final IOException e) {
throw new DrillRuntimeException(e);
}
@@ -359,10 +261,11 @@ public class HiveScan extends AbstractGroupScan {
List<HivePartition> partitions = hiveReadEntry.getHivePartitionWrappers();
int numPartitions = partitions == null ? 0 : partitions.size();
return "HiveScan [table=" + hiveReadEntry.getHiveTableWrapper()
- + ", inputSplits=" + inputSplits
+ ", columns=" + columns
+ ", numPartitions=" + numPartitions
- + ", partitions= " + partitions +"]";
+ + ", partitions= " + partitions
+ + ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry)
+ + "]";
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bc74629a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index c1b6a0c..c967c00 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -79,7 +79,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
"Querying views created in Hive from Drill is not supported in current version.");
}
- return new HiveScan(userName, hiveReadEntry, this, columns);
+ return new HiveScan(userName, hiveReadEntry, this, columns, null);
} catch (ExecutionSetupException e) {
throw new IOException(e);
}