You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2011/10/31 18:30:22 UTC

svn commit: r1195577 [1/2] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ jdbc/src/java/org/apache/hadoop/hive/jdbc/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hiv...

Author: heyongqiang
Date: Mon Oct 31 17:30:21 2011
New Revision: 1195577

URL: http://svn.apache.org/viewvc?rev=1195577&view=rev
Log:
HIVE-1003: optimize metadata only queries (Marcin Kurczych, Namit Jain via He Yongqiang)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/OneNullRowInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java
    hive/trunk/ql/src/test/queries/clientpositive/metadataonly1.q
    hive/trunk/ql/src/test/results/clientpositive/metadataonly1.q.out
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/NullStructSerDe.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveDatabaseMetaData.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFType.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1195577&r1=1195576&r2=1195577&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Oct 31 17:30:21 2011
@@ -378,6 +378,7 @@ public class HiveConf extends Configurat
     HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown
     HIVEPPDRECOGNIZETRANSITIVITY("hive.ppd.recognizetransivity", true), // predicate pushdown
     HIVEPPDREMOVEDUPLICATEFILTERS("hive.ppd.remove.duplicatefilters", true),
+    HIVEMETADATAONLYQUERIES("hive.optimize.metadataonly", true),
     // push predicates down to storage handlers
     HIVEOPTPPD_STORAGE("hive.optimize.ppd.storage", true),
     HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by

Modified: hive/trunk/jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveDatabaseMetaData.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveDatabaseMetaData.java?rev=1195577&r1=1195576&r2=1195577&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveDatabaseMetaData.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveDatabaseMetaData.java Mon Oct 31 17:30:21 2011
@@ -106,7 +106,7 @@ public class HiveDatabaseMetaData implem
   public ResultSet getCatalogs() throws SQLException {
     try {
       // TODO a client call to get the schema's after HIVE-675 is implemented
-      final List<String> catalogs = new ArrayList();
+      final List<String> catalogs = new ArrayList<String>();
       catalogs.add("default");
       return new HiveMetaDataResultSet<String>(Arrays.asList("TABLE_CAT")
               , Arrays.asList("STRING")
@@ -571,7 +571,7 @@ public class HiveDatabaseMetaData implem
   public ResultSet getTables(String catalog, String schemaPattern,
                              String tableNamePattern, String[] types) throws SQLException {
     final List<String> tablesstr;
-    final List<JdbcTable> resultTables = new ArrayList();
+    final List<JdbcTable> resultTables = new ArrayList<JdbcTable>();
     final String resultCatalog;
     if (catalog==null) { // On jdbc the default catalog is null but on hive it's "default"
       resultCatalog = "default";

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1195577&r1=1195576&r2=1195577&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Mon Oct 31 17:30:21 2011
@@ -29,6 +29,7 @@ import java.net.URL;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashSet;
@@ -770,6 +771,11 @@ public class ExecDriver extends Task<Map
   }
 
   @Override
+  public Collection<Operator<? extends Serializable>> getTopOperators() {
+    return getWork().getAliasToWork().values();
+  }
+  
+  @Override
   public boolean hasReduce() {
     MapredWork w = getWork();
     return w.getReducer() != null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=1195577&r1=1195576&r2=1195577&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Mon Oct 31 17:30:21 2011
@@ -539,4 +539,9 @@ public class MapRedTask extends ExecDriv
 
     return null;
   }
+  
+  @Override
+  public Operator<? extends Serializable> getReducer() {
+    return getWork().getReducer();
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1195577&r1=1195576&r2=1195577&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Mon Oct 31 17:30:21 2011
@@ -26,6 +26,7 @@ import java.lang.management.MemoryMXBean
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -452,6 +453,11 @@ public class MapredLocalTask extends Tas
   }
 
   @Override
+  public Collection<Operator<? extends Serializable>> getTopOperators() {
+    return getWork().getAliasToWork().values();
+  }
+  
+  @Override
   public String getName() {
     return "MAPREDLOCAL";
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1195577&r1=1195576&r2=1195577&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Mon Oct 31 17:30:21 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -348,10 +349,18 @@ public abstract class Task<T extends Ser
     return false;
   }
 
+  public Collection<Operator<? extends Serializable>> getTopOperators() {
+    return new LinkedList<Operator<? extends Serializable>>();
+  }
+  
   public boolean hasReduce() {
     return false;
   }
 
+  public Operator<? extends Serializable> getReducer() {
+    return null;
+  }
+  
   public HashMap<String, Long> getCounters() {
     return taskCounters;
   }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/OneNullRowInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/OneNullRowInputFormat.java?rev=1195577&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/OneNullRowInputFormat.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/OneNullRowInputFormat.java Mon Oct 31 17:30:21 2011
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * OneNullRowInputFormat outputs one null row. Used in implementation of
+ * metadata only queries.
+ *
+ */
+public class OneNullRowInputFormat implements
+    InputFormat<NullWritable, NullWritable>, JobConfigurable {
+  private static final Log LOG = LogFactory.getLog(OneNullRowInputFormat.class
+      .getName());
+  MapredWork mrwork = null;
+  List<String> partitions;
+  long len;
+
+  static public class DummyInputSplit implements InputSplit {
+    public DummyInputSplit() {
+    }
+
+    @Override
+    public long getLength() throws IOException {
+      return 1;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+      return new String[0];
+    }
+
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+    }
+
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+    }
+
+  }
+
+  static public class OneNullRowRecordReader implements RecordReader<NullWritable, NullWritable> {
+    private boolean processed = false;
+    public OneNullRowRecordReader() {
+    }
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public NullWritable createKey() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public NullWritable createValue() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return (processed ? 1 : 0);
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return (float) (processed ? 1.0 : 0.0);
+    }
+
+    @Override
+    public boolean next(NullWritable arg0, NullWritable arg1) throws IOException {
+      if(processed) {
+        return false;
+      } else {
+        processed = true;
+        return true;
+      }
+    }
+
+  }
+
+  @Override
+  public RecordReader<NullWritable, NullWritable> getRecordReader(InputSplit arg0, JobConf arg1, Reporter arg2)
+      throws IOException {
+    return new OneNullRowRecordReader();
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf arg0, int arg1) throws IOException {
+    InputSplit[] ret = new InputSplit[1];
+    ret[0] = new DummyInputSplit();
+    LOG.info("Calculating splits");
+    return ret;
+  }
+
+  @Override
+  public void configure(JobConf job) {
+    LOG.info("Using one null row input format");
+  }
+
+}
\ No newline at end of file

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java?rev=1195577&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java Mon Oct 31 17:30:21 2011
@@ -0,0 +1,284 @@
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
+import org.apache.hadoop.hive.serde2.NullStructSerDe;
+
+/**
+ * 
+ * MetadataOnlyOptimizer determines to which TableScanOperators "metadata only"
+ * optimization can be applied. Such operator must use only partition columns
+ * (it is easy to check, because we are after column pruning and all places
+ * where the data from the operator is used must go through GroupByOperator
+ * distinct or distinct-like aggregations. Aggregation is distinct-like if
+ * adding distinct wouldn't change the result, for example min, max.
+ * 
+ * We cannot apply the optimization without group by, because the results depend
+ * on the numbers of rows in partitions, for example count(hr) will count all
+ * rows in matching partitions.
+ * 
+ */
+public class MetadataOnlyOptimizer implements PhysicalPlanResolver {
+  private static final Log LOG = LogFactory.getLog(MetadataOnlyOptimizer.class.getName());
+
+  static private class WalkerCtx implements NodeProcessorCtx {
+    /* operators for which there is chance the optimization can be applied */
+    private final HashSet<TableScanOperator> possible = new HashSet<TableScanOperator>();
+    /* operators for which the optimization will be successful */
+    private final HashSet<TableScanOperator> success = new HashSet<TableScanOperator>();
+
+    /**
+     * Sets operator as one for which there is a chance to apply optimization
+     * 
+     * @param op
+     *          the operator
+     */
+    public void setMayBeMetadataOnly(TableScanOperator op) {
+      possible.add(op);
+    }
+
+    /** Convert all possible operators to success */
+    public void convertMetadataOnly() {
+      success.addAll(possible);
+      possible.clear();
+    }
+
+    /**
+     * Convert all possible operators to banned
+     */
+    public void convertNotMetadataOnly() {
+      possible.clear();
+      success.clear();
+    }
+
+    /**
+     * Returns HashSet of collected operators for which the optimization may be
+     * applicable.
+     */
+    public HashSet<TableScanOperator> getMayBeMetadataOnlyTableScans() {
+      return possible;
+    }
+
+    /**
+     * Returns HashSet of collected operators for which the optimization is
+     * applicable.
+     */
+    public HashSet<TableScanOperator> getMetadataOnlyTableScans() {
+      return success;
+    }
+
+  }
+
+  static private class TableScanProcessor implements NodeProcessor {
+    public TableScanProcessor() {
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      TableScanOperator node = (TableScanOperator) nd;
+      WalkerCtx walkerCtx = (WalkerCtx) procCtx;
+      if (((node.getNeededColumnIDs() == null) || (node.getNeededColumnIDs().size() == 0))
+          && ((node.getConf() == null) || 
+              (node.getConf().getVirtualCols() == null) || 
+              (node.getConf().getVirtualCols().isEmpty()))) {
+        walkerCtx.setMayBeMetadataOnly(node);
+      }
+      return nd;
+    }
+  }
+
+  static private class FileSinkProcessor implements NodeProcessor {
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      WalkerCtx walkerCtx = (WalkerCtx) procCtx;
+      // There can be atmost one element eligible to be converted to
+      // metadata only
+      if ((walkerCtx.getMayBeMetadataOnlyTableScans().isEmpty())
+          || (walkerCtx.getMayBeMetadataOnlyTableScans().size() > 1)) {
+        return nd;
+      }
+
+      for (Node op : stack) {
+        if (op instanceof GroupByOperator) {
+          GroupByOperator gby = (GroupByOperator) op;
+          if (!gby.getConf().isDistinctLike()) {
+            // GroupBy not distinct like, disabling
+            walkerCtx.convertNotMetadataOnly();
+            return nd;
+          }
+        }
+      }
+
+      walkerCtx.convertMetadataOnly();
+      return nd;
+    }
+  }
+
+  @Override
+  public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+    Dispatcher disp = new MetadataOnlyTaskDispatcher(pctx);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.rootTasks);
+    ogw.startWalking(topNodes, null);
+    return pctx;
+  }
+
+  /**
+   * Iterate over all tasks one-to-one and convert them to metadata only
+   */
+  class MetadataOnlyTaskDispatcher implements Dispatcher {
+
+    private PhysicalContext physicalContext;
+
+    public MetadataOnlyTaskDispatcher(PhysicalContext context) {
+      super();
+      physicalContext = context;
+    }
+
+    private String getAliasForTableScanOperator(MapredWork work,
+        TableScanOperator tso) {
+
+      for (Map.Entry<String, Operator<? extends Serializable>> entry : work.getAliasToWork().entrySet()) {
+        if (entry.getValue() == tso) {
+          return entry.getKey();
+        }
+      }
+
+      return null;
+    }
+
+    private PartitionDesc changePartitionToMetadataOnly(PartitionDesc desc) {
+      if (desc != null) {
+        desc.setInputFileFormatClass(OneNullRowInputFormat.class);
+        desc.setDeserializerClass(NullStructSerDe.class);
+        desc.setSerdeClassName(NullStructSerDe.class.getName());
+      }
+      return desc;
+    }
+
+    private List<String> getPathsForAlias(MapredWork work, String alias) {
+      List<String> paths = new ArrayList<String>();
+
+      for (Map.Entry<String, ArrayList<String>> entry : work.getPathToAliases().entrySet()) {
+        if (entry.getValue().contains(alias)) {
+          paths.add(entry.getKey());
+        }
+      }
+
+      return paths;
+    }
+
+    private void processAlias(MapredWork work, String alias) {
+      // Change the alias partition desc
+      PartitionDesc aliasPartn = work.getAliasToPartnInfo().get(alias);
+      changePartitionToMetadataOnly(aliasPartn);
+
+      List<String> paths = getPathsForAlias(work, alias);
+      for (String path : paths) {
+        PartitionDesc newPartition = changePartitionToMetadataOnly(work.getPathToPartitionInfo().get(
+            path));
+        Path fakePath = new Path("file", null,
+            "/fake-path-metadata-only-query-" + newPartition.getTableName()
+                + newPartition.getPartSpec().toString());
+        work.getPathToPartitionInfo().remove(path);
+        work.getPathToPartitionInfo().put(fakePath.getName(), newPartition);
+        ArrayList<String> aliases = work.getPathToAliases().remove(path);
+        work.getPathToAliases().put(fakePath.getName(), aliases);
+      }
+    }
+
+    private void convertToMetadataOnlyQuery(MapredWork work,
+        TableScanOperator tso) {
+      String alias = getAliasForTableScanOperator(work, tso);
+      processAlias(work, alias);
+    }
+
+    @Override
+    public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+        throws SemanticException {
+      Task<? extends Serializable> task = (Task<? extends Serializable>) nd;
+
+      Collection<Operator<? extends Serializable>> topOperators 
+        = task.getTopOperators();
+      if (topOperators.size() == 0) {
+        return null;
+      }
+
+      LOG.info("Looking for table scans where optimization is applicable");
+      // create a the context for walking operators
+      ParseContext parseContext = physicalContext.getParseContext();
+      WalkerCtx walkerCtx = new WalkerCtx();
+
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      opRules.put(new RuleRegExp("R1", "TS%"), new TableScanProcessor());
+      opRules.put(new RuleRegExp("R2", "GBY%.*FS%"), new FileSinkProcessor());
+
+      // The dispatcher fires the processor corresponding to the closest
+      // matching rule and passes the context along
+      Dispatcher disp = new DefaultRuleDispatcher(null, opRules, walkerCtx);
+      GraphWalker ogw = new PreOrderWalker(disp);
+
+      // Create a list of topOp nodes
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      // Get the top Nodes for this map-reduce task
+      for (Operator<? extends Serializable> 
+           workOperator : topOperators) {
+        if (parseContext.getTopOps().values().contains(workOperator)) {
+          topNodes.add(workOperator);
+        }
+      }
+
+      if (task.getReducer() != null) {
+        topNodes.add(task.getReducer());
+      }
+      
+      ogw.startWalking(topNodes, null);
+
+      LOG.info(String.format("Found %d metadata only table scans",
+          walkerCtx.getMetadataOnlyTableScans().size()));
+      Iterator<TableScanOperator> iterator 
+        = walkerCtx.getMetadataOnlyTableScans().iterator();
+
+      while (iterator.hasNext()) {
+        TableScanOperator tso = iterator.next();
+        LOG.info("Metadata only table scan for " + tso.getConf().getAlias());
+        convertToMetadataOnlyQuery((MapredWork) task.getWork(), tso);
+      }
+
+      return null;
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java?rev=1195577&r1=1195576&r2=1195577&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java Mon Oct 31 17:30:21 2011
@@ -56,6 +56,9 @@ public class PhysicalOptimizer {
       resolvers.add(new IndexWhereResolver());
     }
     resolvers.add(new MapJoinResolver());
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEMETADATAONLYQUERIES)) {
+      resolvers.add(new MetadataOnlyOptimizer());
+    }
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java?rev=1195577&r1=1195576&r2=1195577&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java Mon Oct 31 17:30:21 2011
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.ql.udf.UDFType;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+
 /**
  * GroupByDesc.
  *
@@ -175,4 +180,22 @@ public class GroupByDesc implements java
   public void setBucketGroup(boolean dataSorted) {
     bucketGroup = dataSorted;
   }
+  
+  /**
+   * Checks if this grouping is like distinct, which means that all non-distinct grouping
+   * columns behave like they were distinct - for example min and max operators.
+   */
+  public boolean isDistinctLike() {
+    ArrayList<AggregationDesc> aggregators = getAggregators();
+    for(AggregationDesc ad: aggregators){
+      if(!ad.getDistinct()) {
+        GenericUDAFEvaluator udafEval = ad.getGenericUDAFEvaluator();
+        UDFType annot = udafEval.getClass().getAnnotation(UDFType.class);
+        if(annot == null || !annot.distinctLike()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFType.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFType.java?rev=1195577&r1=1195576&r2=1195577&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFType.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFType.java Mon Oct 31 17:30:21 2011
@@ -33,4 +33,5 @@ import java.lang.annotation.Target;
 public @interface UDFType {
   boolean deterministic() default true;
   boolean stateful() default false;
+  boolean distinctLike() default false;
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java?rev=1195577&r1=1195576&r2=1195577&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java Mon Oct 31 17:30:21 2011
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.De
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.udf.UDFType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -49,6 +50,7 @@ public class GenericUDAFMax extends Abst
     return new GenericUDAFMaxEvaluator();
   }
 
+  @UDFType(distinctLike=true)
   public static class GenericUDAFMaxEvaluator extends GenericUDAFEvaluator {
 
     ObjectInspector inputOI;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java?rev=1195577&r1=1195576&r2=1195577&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java Mon Oct 31 17:30:21 2011
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.De
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.udf.UDFType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -49,6 +50,7 @@ public class GenericUDAFMin extends Abst
     return new GenericUDAFMinEvaluator();
   }
 
+  @UDFType(distinctLike=true)
   public static class GenericUDAFMinEvaluator extends GenericUDAFEvaluator {
 
     ObjectInspector inputOI;

Added: hive/trunk/ql/src/test/queries/clientpositive/metadataonly1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/metadataonly1.q?rev=1195577&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/metadataonly1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/metadataonly1.q Mon Oct 31 17:30:21 2011
@@ -0,0 +1,35 @@
+CREATE TABLE TEST1(A INT, B DOUBLE) partitioned by (ds string);
+explain extended select max(ds) from TEST1;
+select max(ds) from TEST1;
+
+alter table TEST1 add partition (ds='1');
+explain extended select max(ds) from TEST1;
+select max(ds) from TEST1;
+
+explain extended select count(distinct ds) from TEST1;
+select count(distinct ds) from TEST1;
+
+explain extended select count(ds) from TEST1;
+select count(ds) from TEST1;
+
+alter table TEST1 add partition (ds='2');
+explain extended 
+select count(*) from TEST1 a2 join (select max(ds) m from TEST1) b on a2.ds=b.m;
+select count(*) from TEST1 a2 join (select max(ds) m from TEST1) b on a2.ds=b.m;
+
+
+CREATE TABLE TEST2(A INT, B DOUBLE) partitioned by (ds string, hr string);
+alter table TEST2 add partition (ds='1', hr='1');
+alter table TEST2 add partition (ds='1', hr='2');
+alter table TEST2 add partition (ds='1', hr='3');
+
+explain extended select ds, count(distinct hr) from TEST2 group by ds;
+select ds, count(distinct hr) from TEST2 group by ds;
+
+explain extended select ds, count(hr) from TEST2 group by ds;
+select ds, count(hr) from TEST2 group by ds;
+
+set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+
+explain extended select max(ds) from TEST1;
+select max(ds) from TEST1;