You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/30 17:22:48 UTC

svn commit: r1635536 [7/28] - in /hive/branches/spark: ./ accumulo-handler/ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/ accumulo-handler/src/test/org/apache/hadoo...

Modified: hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java (original)
+++ hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java Thu Oct 30 16:22:33 2014
@@ -23,13 +23,16 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
 import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
 import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
 import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
 import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
 import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
 import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.events.ListenerEvent;
@@ -60,52 +63,70 @@ public class DummyListener extends MetaS
 
   @Override
   public void onConfigChange(ConfigChangeEvent configChange) {
-    notifyList.add(configChange);
+    addEvent(configChange);
   }
 
   @Override
   public void onAddPartition(AddPartitionEvent partition) throws MetaException {
-    notifyList.add(partition);
+    addEvent(partition);
   }
 
   @Override
   public void onCreateDatabase(CreateDatabaseEvent db) throws MetaException {
-    notifyList.add(db);
+    addEvent(db);
   }
 
   @Override
   public void onCreateTable(CreateTableEvent table) throws MetaException {
-    notifyList.add(table);
+    addEvent(table);
   }
 
   @Override
   public void onDropDatabase(DropDatabaseEvent db) throws MetaException {
-    notifyList.add(db);
+    addEvent(db);
   }
 
   @Override
   public void onDropPartition(DropPartitionEvent partition) throws MetaException {
-    notifyList.add(partition);
+    addEvent(partition);
   }
 
   @Override
   public void onDropTable(DropTableEvent table) throws MetaException {
-    notifyList.add(table);
+    addEvent(table);
   }
 
   @Override
   public void onAlterTable(AlterTableEvent event) throws MetaException {
-    notifyList.add(event);
+    addEvent(event);
   }
 
   @Override
   public void onAlterPartition(AlterPartitionEvent event) throws MetaException {
-    notifyList.add(event);
+    addEvent(event);
   }
 
   @Override
   public void onLoadPartitionDone(LoadPartitionDoneEvent partEvent) throws MetaException {
-    notifyList.add(partEvent);
+    addEvent(partEvent);
   }
 
+  @Override
+  public void onAddIndex(AddIndexEvent indexEvent) throws MetaException {
+    addEvent(indexEvent);
+  }
+
+  @Override
+  public void onDropIndex(DropIndexEvent indexEvent) throws MetaException {
+    addEvent(indexEvent);
+  }
+
+  @Override
+  public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException {
+    addEvent(indexEvent);
+  }
+
+  private void addEvent(ListenerEvent event) {
+    notifyList.add(event);
+  }
 }

Modified: hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyPreListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyPreListener.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyPreListener.java (original)
+++ hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyPreListener.java Thu Oct 30 16:22:33 2014
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.
  *
  * DummyPreListener.
  *
- * An implemenation of MetaStorePreEventListener which stores the Events it's seen in a list.
+ * An implementation of MetaStorePreEventListener which stores the Events it's seen in a list.
  */
 public class DummyPreListener extends MetaStorePreEventListener {
 

Modified: hive/branches/spark/odbc/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/odbc/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/odbc/pom.xml (original)
+++ hive/branches/spark/odbc/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive</artifactId>
-    <version>0.14.0-SNAPSHOT</version>
+    <version>0.15.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

Modified: hive/branches/spark/packaging/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/packaging/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/packaging/pom.xml (original)
+++ hive/branches/spark/packaging/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive</artifactId>
-    <version>0.14.0-SNAPSHOT</version>
+    <version>0.15.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

Modified: hive/branches/spark/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/pom.xml (original)
+++ hive/branches/spark/pom.xml Thu Oct 30 16:22:33 2014
@@ -21,7 +21,7 @@
   </parent>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive</artifactId>
-  <version>0.14.0-SNAPSHOT</version>
+  <version>0.15.0-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <name>Hive</name>
@@ -53,7 +53,7 @@
   </modules>
 
   <properties>
-    <hive.version.shortname>0.14.0</hive.version.shortname>
+    <hive.version.shortname>0.15.0</hive.version.shortname>
 
     <!-- Build Properties -->
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -133,7 +133,7 @@
     <jms.version>1.1</jms.version>
     <jodd.version>3.5.2</jodd.version>
     <json.version>20090211</json.version>
-    <junit.version>4.10</junit.version>
+    <junit.version>4.11</junit.version>
     <kryo.version>2.22</kryo.version>
     <libfb303.version>0.9.0</libfb303.version>
     <libthrift.version>0.9.0</libthrift.version>
@@ -152,7 +152,7 @@
     <stax.version>1.0.1</stax.version>
     <slf4j.version>1.7.5</slf4j.version>
     <ST4.version>4.0.4</ST4.version>
-    <tez.version>0.5.1</tez.version>
+    <tez.version>0.5.2-SNAPSHOT</tez.version>
     <super-csv.version>2.2.0</super-csv.version>
     <spark.version>1.2.0-SNAPSHOT</spark.version>
     <scala.binary.version>2.10</scala.binary.version>
@@ -165,6 +165,7 @@
     <zookeeper.version>3.4.6</zookeeper.version>
     <jpam.version>1.1</jpam.version>
     <felix.version>2.4.0</felix.version>
+    <curator.version>2.6.0</curator.version>
   </properties>
 
   <repositories>
@@ -486,7 +487,13 @@
           </exclusion>
         </exclusions>
       </dependency>
-      <dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
+
+        <dependency>
         <groupId>org.codehaus.groovy</groupId>
         <artifactId>groovy-all</artifactId>
         <version>${groovy.version}</version>

Modified: hive/branches/spark/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/pom.xml (original)
+++ hive/branches/spark/ql/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive</artifactId>
-    <version>0.14.0-SNAPSHOT</version>
+    <version>0.15.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -28,7 +28,7 @@
   <name>Hive Query Language</name>
 
   <properties>
-    <optiq.version>0.9.1-incubating-SNAPSHOT</optiq.version>
+    <calcite.version>0.9.2-incubating-SNAPSHOT</calcite.version>
     <hive.path.to.root>..</hive.path.to.root>
   </properties>
 
@@ -183,9 +183,9 @@
       <version>${datanucleus-core.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.optiq</groupId>
-      <artifactId>optiq-core</artifactId>
-      <version>${optiq.version}</version>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-core</artifactId>
+      <version>${calcite.version}</version>
       <exclusions>
         <!-- hsqldb interferes with the use of derby as the default db
           in hive's use of datanucleus. 
@@ -201,9 +201,9 @@
       </exclusions>
     </dependency>   
     <dependency>
-      <groupId>org.apache.optiq</groupId>
-      <artifactId>optiq-avatica</artifactId>
-      <version>${optiq.version}</version>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-avatica</artifactId>
+      <version>${calcite.version}</version>
       <exclusions>
         <!-- hsqldb interferes with the use of derby as the default db
           in hive's use of datanucleus. 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Thu Oct 30 16:22:33 2014
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.hive.ql;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Schema;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.ExplainTask;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.MoveTask;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -69,7 +72,6 @@ import org.apache.hadoop.hive.ql.lockmgr
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
-import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
 import org.apache.hadoop.hive.ql.metadata.DummyPartition;
@@ -409,6 +411,8 @@ public class Driver implements CommandPr
         HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
         hookCtx.setConf(conf);
         hookCtx.setUserName(userName);
+        hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
+        hookCtx.setCommand(command);
         for (HiveSemanticAnalyzerHook hook : saHooks) {
           tree = hook.preAnalyze(hookCtx, tree);
         }
@@ -465,6 +469,13 @@ public class Driver implements CommandPr
         }
       }
 
+      if (conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
+        String explainOutput = getExplainOutput(sem, plan, tree.dump());
+        if (explainOutput != null) {
+          LOG.info("EXPLAIN output: " + explainOutput);
+        }
+      }
+
       return 0;
     } catch (Exception e) {
       ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
@@ -492,6 +503,33 @@ public class Driver implements CommandPr
   }
 
   /**
+   * Returns EXPLAIN EXTENDED output for a semantically
+   * analyzed query.
+   *
+   * @param sem semantic analyzer for analyzed query
+   * @param plan query plan
+   * @param astStringTree AST tree dump
+   * @throws java.io.IOException
+   */
+  private String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan,
+      String astStringTree) throws IOException {
+    String ret = null;
+    ExplainTask task = new ExplainTask();
+    task.initialize(conf, plan, null);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintStream ps = new PrintStream(baos);
+    try {
+      task.getJSONPlan(ps, astStringTree, sem.getRootTasks(), sem.getFetchTask(),
+          false, true, true);
+      ret = baos.toString();
+    } catch (Exception e) {
+      LOG.warn("Exception generating explain output: " + e, e);
+    }
+
+    return ret;
+  }
+
+  /**
    * Do authorization using post semantic analysis information in the semantic analyzer
    * The original command is also passed so that authorization interface can provide
    * more useful information in logs.

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Thu Oct 30 16:22:33 2014
@@ -421,6 +421,8 @@ public enum ErrorMsg {
       "an AcidOutputFormat or is not bucketed", true),
   ACID_NO_SORTED_BUCKETS(10298, "ACID insert, update, delete not supported on tables that are " +
       "sorted, table {0}", true),
+  ALTER_TABLE_TYPE_PARTIAL_PARTITION_SPEC_NO_SUPPORTED(10299,
+      "Alter table partition type {0} does not allow partial partition spec", true),
 
   //========================== 20000 range starts here ========================//
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java Thu Oct 30 16:22:33 2014
@@ -208,6 +208,7 @@ public class ColumnInfo implements Seria
       return false;
     }
 
+    // TODO: why does this not compare tabAlias?
     ColumnInfo dest = (ColumnInfo)obj;
     if ((!checkEquals(internalName, dest.getInternalName())) ||
         (!checkEquals(alias, dest.getAlias())) ||
@@ -221,6 +222,18 @@ public class ColumnInfo implements Seria
     return true;
   }
 
+  public boolean isSameColumnForRR(ColumnInfo other) {
+    return checkEquals(tabAlias, other.tabAlias)
+        && checkEquals(alias, other.alias)
+        && checkEquals(internalName, other.internalName)
+        && checkEquals(getType(), other.getType());
+  }
+
+  public String toMappingString(String tab, String col) {
+    return tab + "." + col + " => {" + tabAlias + ", " + alias + ", "
+        + internalName + ": " + getType() + "}";
+  }
+
   public void setObjectinspector(ObjectInspector writableObjectInspector) {
     this.objectInspector = writableObjectInspector;
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java Thu Oct 30 16:22:33 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -36,7 +35,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -64,7 +62,6 @@ public class CommonMergeJoinOperator ext
   private static final long serialVersionUID = 1L;
   private boolean isBigTableWork;
   private static final Log LOG = LogFactory.getLog(CommonMergeJoinOperator.class.getName());
-  private Map<Integer, String> aliasToInputNameMap;
   transient List<Object>[] keyWritables;
   transient List<Object>[] nextKeyWritables;
   transient RowContainer<List<Object>>[] nextGroupStorage;
@@ -309,6 +306,8 @@ public class CommonMergeJoinOperator ext
   public void closeOp(boolean abort) throws HiveException {
     joinFinalLeftData();
 
+    super.closeOp(abort);
+
     // clean up
     for (int pos = 0; pos < order.length; pos++) {
       if (pos != posBigTable) {
@@ -365,6 +364,9 @@ public class CommonMergeJoinOperator ext
       joinOneGroup();
       dataInCache = false;
       for (byte pos = 0; pos < order.length; pos++) {
+        if (candidateStorage[pos] == null) {
+          continue;
+        }
         if (this.candidateStorage[pos].rowCount() > 0) {
           dataInCache = true;
           break;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Thu Oct 30 16:22:33 2014
@@ -88,6 +88,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter;
 import org.apache.hadoop.hive.ql.parse.AlterTablePartMergeFilesDesc;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.AlterIndexDesc;
@@ -153,8 +154,10 @@ import org.apache.hadoop.hive.ql.securit
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveV1Authorizer;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
 import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
@@ -183,11 +186,13 @@ import java.io.Serializable;
 import java.io.Writer;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -778,7 +783,7 @@ public class DDLTask extends Task<DDLWor
   }
 
   private int dropIndex(Hive db, DropIndexDesc dropIdx) throws HiveException {
-    db.dropIndex(dropIdx.getTableName(), dropIdx.getIndexName(), true);
+    db.dropIndex(dropIdx.getTableName(), dropIdx.getIndexName(), dropIdx.isThrowException(), true);
     return 0;
   }
 
@@ -2201,7 +2206,7 @@ public class DDLTask extends Task<DDLWor
    *           Throws this exception if an unexpected error occurs.
    */
   private int showTables(Hive db, ShowTablesDesc showTbls) throws HiveException {
-    // get the tables for the desired pattenn - populate the output stream
+    // get the tables for the desired pattern - populate the output stream
     List<String> tbls = null;
     String dbName = showTbls.getDbName();
 
@@ -2278,7 +2283,12 @@ public class DDLTask extends Task<DDLWor
     Set<String> funcs = null;
     if (showFuncs.getPattern() != null) {
       LOG.info("pattern: " + showFuncs.getPattern());
-      funcs = FunctionRegistry.getFunctionNames(showFuncs.getPattern());
+      if (showFuncs.getIsLikePattern()) {
+         funcs = FunctionRegistry.getFunctionNamesByLikePattern(showFuncs.getPattern());
+      } else {
+         console.printInfo("SHOW FUNCTIONS is deprecated, please use SHOW FUNCTIONS LIKE instead.");
+         funcs = FunctionRegistry.getFunctionNames(showFuncs.getPattern());
+      }
       LOG.info("results : " + funcs.size());
     } else {
       funcs = FunctionRegistry.getFunctionNames();
@@ -2802,7 +2812,7 @@ public class DDLTask extends Task<DDLWor
    *          is the function we are describing
    * @throws HiveException
    */
-  private int describeFunction(DescFunctionDesc descFunc) throws HiveException {
+  private int describeFunction(DescFunctionDesc descFunc) throws HiveException, SQLException {
     String funcName = descFunc.getName();
 
     // write the results in the file
@@ -3083,6 +3093,15 @@ public class DDLTask extends Task<DDLWor
 
       List<FieldSchema> cols = null;
       List<ColumnStatisticsObj> colStats = null;
+
+      Deserializer deserializer = tbl.getDeserializer(true);
+      if (deserializer instanceof AbstractSerDe) {
+        String errorMsgs = ((AbstractSerDe) deserializer).getConfigurationErrors();
+        if (errorMsgs != null && !errorMsgs.isEmpty()) {
+          throw new SQLException(errorMsgs);
+        }
+      }
+
       if (colPath.equals(tableName)) {
         cols = (part == null || tbl.getTableType() == TableType.VIRTUAL_VIEW) ?
             tbl.getCols() : part.getCols();
@@ -3091,7 +3110,7 @@ public class DDLTask extends Task<DDLWor
           cols.addAll(tbl.getPartCols());
         }
       } else {
-        cols = Hive.getFieldsFromDeserializer(colPath, tbl.getDeserializer());
+        cols = Hive.getFieldsFromDeserializer(colPath, deserializer);
         if (descTbl.isFormatted()) {
           // when column name is specified in describe table DDL, colPath will
           // will be table_name.column_name
@@ -3121,6 +3140,8 @@ public class DDLTask extends Task<DDLWor
       outStream.close();
       outStream = null;
 
+    } catch (SQLException e) {
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR, tableName);
     } catch (IOException e) {
       throw new HiveException(e, ErrorMsg.GENERIC_ERROR, tableName);
     } finally {
@@ -3259,22 +3280,77 @@ public class DDLTask extends Task<DDLWor
     // alter the table
     Table tbl = db.getTable(alterTbl.getOldName());
 
-    Partition part = null;
     List<Partition> allPartitions = null;
     if (alterTbl.getPartSpec() != null) {
-      if (alterTbl.getOp() != AlterTableDesc.AlterTableTypes.ALTERPROTECTMODE) {
-        part = db.getPartition(tbl, alterTbl.getPartSpec(), false);
+      Map<String, String> partSpec = alterTbl.getPartSpec(); 
+      if (DDLSemanticAnalyzer.isFullSpec(tbl, partSpec)) {
+        allPartitions = new ArrayList<Partition>();
+        Partition part = db.getPartition(tbl, partSpec, false);
         if (part == null) {
+          // User provided a fully specified partition spec but it doesn't exist, fail.
           throw new HiveException(ErrorMsg.INVALID_PARTITION,
-              StringUtils.join(alterTbl.getPartSpec().keySet(), ',') + " for table " + alterTbl.getOldName());
+                StringUtils.join(alterTbl.getPartSpec().keySet(), ',') + " for table " + alterTbl.getOldName());
+
         }
-      }
-      else {
+        allPartitions.add(part);
+      } else {
+        // DDLSemanticAnalyzer has already checked if partial partition specs are allowed,
+        // thus we should not need to check it here.
         allPartitions = db.getPartitions(tbl, alterTbl.getPartSpec());
       }
     }
 
     Table oldTbl = tbl.copy();
+    if (allPartitions != null) {
+      // Alter all partitions
+      for (Partition part : allPartitions) {
+        alterTableOrSinglePartition(alterTbl, tbl, part);
+      }
+    } else {
+      // Just alter the table
+      alterTableOrSinglePartition(alterTbl, tbl, null);
+    }
+
+    if (allPartitions == null) {
+      updateModifiedParameters(tbl.getTTable().getParameters(), conf);
+      tbl.checkValidity();
+    } else {
+      for (Partition tmpPart: allPartitions) {
+        updateModifiedParameters(tmpPart.getParameters(), conf);
+      }
+    }
+
+    try {
+      if (allPartitions == null) {
+        db.alterTable(alterTbl.getOldName(), tbl);
+      } else {
+        db.alterPartitions(tbl.getTableName(), allPartitions);
+      }
+    } catch (InvalidOperationException e) {
+      LOG.error("alter table: " + stringifyException(e));
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
+    }
+
+    // This is kind of hacky - the read entity contains the old table, whereas
+    // the write entity
+    // contains the new table. This is needed for rename - both the old and the
+    // new table names are
+    // passed
+    // Don't acquire locks for any of these, we have already asked for them in DDLSemanticAnalyzer.
+    if (allPartitions != null ) {
+      for (Partition tmpPart: allPartitions) {
+        work.getInputs().add(new ReadEntity(tmpPart));
+        work.getOutputs().add(new WriteEntity(tmpPart, WriteEntity.WriteType.DDL_NO_LOCK));
+      }
+    } else {
+      work.getInputs().add(new ReadEntity(oldTbl));
+      work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
+    }
+    return 0;
+  }
+
+  private int alterTableOrSinglePartition(AlterTableDesc alterTbl, Table tbl, Partition part)
+      throws HiveException {
     List<FieldSchema> oldCols = (part == null ? tbl.getCols() : part.getCols());
     StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
 
@@ -3420,12 +3496,10 @@ public class DDLTask extends Task<DDLWor
       AlterTableDesc.ProtectModeType protectMode = alterTbl.getProtectModeType();
 
       ProtectMode mode = null;
-      if (allPartitions != null) {
-        for (Partition tmpPart: allPartitions) {
-          mode = tmpPart.getProtectMode();
-          setAlterProtectMode(protectModeEnable, protectMode, mode);
-          tmpPart.setProtectMode(mode);
-        }
+      if (part != null) {
+        mode = part.getProtectMode();
+        setAlterProtectMode(protectModeEnable, protectMode, mode);
+        part.setProtectMode(mode);
       } else {
         mode = tbl.getProtectMode();
         setAlterProtectMode(protectModeEnable,protectMode, mode);
@@ -3468,12 +3542,12 @@ public class DDLTask extends Task<DDLWor
         throw new HiveException(e);
       }
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDSKEWEDBY) {
-      /* Validation's been done at compile time. no validation is needed here. */
+      // Validation's been done at compile time. no validation is needed here.
       List<String> skewedColNames = null;
       List<List<String>> skewedValues = null;
 
       if (alterTbl.isTurnOffSkewed()) {
-        /* Convert skewed table to non-skewed table. */
+        // Convert skewed table to non-skewed table.
         skewedColNames = new ArrayList<String>();
         skewedValues = new ArrayList<List<String>>();
       } else {
@@ -3482,7 +3556,7 @@ public class DDLTask extends Task<DDLWor
       }
 
       if ( null == tbl.getSkewedInfo()) {
-        /* Convert non-skewed table to skewed table. */
+        // Convert non-skewed table to skewed table.
         SkewedInfo skewedInfo = new SkewedInfo();
         skewedInfo.setSkewedColNames(skewedColNames);
         skewedInfo.setSkewedColValues(skewedValues);
@@ -3524,59 +3598,12 @@ public class DDLTask extends Task<DDLWor
         }
         tbl.setNumBuckets(alterTbl.getNumberBuckets());
       }
-   } else {
+    } else {
       throw new HiveException(ErrorMsg.UNSUPPORTED_ALTER_TBL_OP, alterTbl.getOp().toString());
     }
 
-    if (part == null && allPartitions == null) {
-      updateModifiedParameters(tbl.getTTable().getParameters(), conf);
-      tbl.checkValidity();
-    } else if (part != null) {
-      updateModifiedParameters(part.getParameters(), conf);
-    }
-    else {
-      for (Partition tmpPart: allPartitions) {
-        updateModifiedParameters(tmpPart.getParameters(), conf);
-      }
-    }
-
-    try {
-      if (part == null && allPartitions == null) {
-        db.alterTable(alterTbl.getOldName(), tbl);
-      } else if (part != null) {
-        db.alterPartition(tbl.getTableName(), part);
-      }
-      else {
-        db.alterPartitions(tbl.getTableName(), allPartitions);
-      }
-    } catch (InvalidOperationException e) {
-      LOG.info("alter table: " + stringifyException(e));
-      throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
-    }
-
-    // This is kind of hacky - the read entity contains the old table, whereas
-    // the write entity
-    // contains the new table. This is needed for rename - both the old and the
-    // new table names are
-    // passed
-    // Don't acquire locks for any of these, we have already asked for them in DDLSemanticAnalyzer.
-    if(part != null) {
-      work.getInputs().add(new ReadEntity(part));
-      work.getOutputs().add(new WriteEntity(part, WriteEntity.WriteType.DDL_NO_LOCK));
-    }
-    else if (allPartitions != null ){
-      for (Partition tmpPart: allPartitions) {
-        work.getInputs().add(new ReadEntity(tmpPart));
-        work.getOutputs().add(new WriteEntity(tmpPart, WriteEntity.WriteType.DDL_NO_LOCK));
-      }
-    }
-    else {
-      work.getInputs().add(new ReadEntity(oldTbl));
-      work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
-    }
     return 0;
   }
-
   /**
    * Drop a given table or some partitions. DropTableDesc is currently used for both.
    *
@@ -3896,7 +3923,7 @@ public class DDLTask extends Task<DDLWor
     tbl.setInputFormatClass(crtTbl.getInputFormat());
     tbl.setOutputFormatClass(crtTbl.getOutputFormat());
 
-    // only persist input/ouput format to metadata when it is explicitly specified.
+    // only persist input/output format to metadata when it is explicitly specified.
     // Otherwise, load lazily via StorageHandler at query time.
     if (crtTbl.getInputFormat() != null && !crtTbl.getInputFormat().isEmpty()) {
       tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName());
@@ -3965,7 +3992,7 @@ public class DDLTask extends Task<DDLWor
    * @throws HiveException
    *           Throws this exception if an unexpected error occurs.
    */
-  private int createTableLike(Hive db, CreateTableLikeDesc crtTbl) throws HiveException {
+  private int createTableLike(Hive db, CreateTableLikeDesc crtTbl) throws Exception {
     // Get the existing table
     Table oldtbl = db.getTable(crtTbl.getLikeTableName());
     Table tbl;
@@ -4031,12 +4058,22 @@ public class DDLTask extends Task<DDLWor
         tbl.unsetDataLocation();
       }
 
+      Class<? extends Deserializer> serdeClass = oldtbl.getDeserializerClass();
+
       Map<String, String> params = tbl.getParameters();
       // We should copy only those table parameters that are specified in the config.
+      SerDeSpec spec = AnnotationUtils.getAnnotation(serdeClass, SerDeSpec.class);
       String paramsStr = HiveConf.getVar(conf, HiveConf.ConfVars.DDL_CTL_PARAMETERS_WHITELIST);
+
+      Set<String> retainer = new HashSet<String>();
+      if (spec != null && spec.schemaProps() != null) {
+        retainer.addAll(Arrays.asList(spec.schemaProps()));
+      }
       if (paramsStr != null) {
-        List<String> paramsList = Arrays.asList(paramsStr.split(","));
-        params.keySet().retainAll(paramsList);
+        retainer.addAll(Arrays.asList(paramsStr.split(",")));
+      }
+      if (!retainer.isEmpty()) {
+        params.keySet().retainAll(retainer);
       } else {
         params.clear();
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Thu Oct 30 16:22:33 2014
@@ -30,7 +30,7 @@ import java.util.Map;
 import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -66,6 +66,7 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -219,27 +220,32 @@ public class FetchOperator implements Se
   /**
    * A cache of InputFormat instances.
    */
-  private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats = new HashMap<Class, InputFormat<WritableComparable, Writable>>();
+  private static final Map<String, InputFormat> inputFormats = new HashMap<String, InputFormat>();
 
   @SuppressWarnings("unchecked")
-  static InputFormat<WritableComparable, Writable> getInputFormatFromCache(Class inputFormatClass,
-      Configuration conf) throws IOException {
-    if (!inputFormats.containsKey(inputFormatClass)) {
+  static InputFormat getInputFormatFromCache(Class<? extends InputFormat> inputFormatClass,
+       JobConf conf) throws IOException {
+    if (Configurable.class.isAssignableFrom(inputFormatClass) ||
+        JobConfigurable.class.isAssignableFrom(inputFormatClass)) {
+      return (InputFormat<WritableComparable, Writable>) ReflectionUtils
+          .newInstance(inputFormatClass, conf);
+    }
+    InputFormat format = inputFormats.get(inputFormatClass.getName());
+    if (format == null) {
       try {
-        InputFormat<WritableComparable, Writable> newInstance = (InputFormat<WritableComparable, Writable>) ReflectionUtils
-            .newInstance(inputFormatClass, conf);
-        inputFormats.put(inputFormatClass, newInstance);
+        format = ReflectionUtils.newInstance(inputFormatClass, conf);
+        inputFormats.put(inputFormatClass.getName(), format);
       } catch (Exception e) {
         throw new IOException("Cannot create an instance of InputFormat class "
             + inputFormatClass.getName() + " as specified in mapredWork!", e);
       }
     }
-    return inputFormats.get(inputFormatClass);
+    return format;
   }
 
   private StructObjectInspector getRowInspectorFromTable(TableDesc table) throws Exception {
     Deserializer serde = table.getDeserializerClass().newInstance();
-    SerDeUtils.initializeSerDe(serde, job, table.getProperties(), null);
+    SerDeUtils.initializeSerDeWithoutErrorCheck(serde, job, table.getProperties(), null);
     return createRowInspector(getStructOIFrom(serde.getObjectInspector()));
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Oct 30 16:22:33 2014
@@ -109,6 +109,15 @@ public class FileSinkOperator extends Te
   private StructField bucketField; // field bucket is in in record id
   private StructObjectInspector recIdInspector; // OI for inspecting record id
   private IntObjectInspector bucketInspector; // OI for inspecting bucket id
+  protected transient long numRows = 0;
+  protected transient long cntr = 1;
+
+  /**
+   * Counters.
+   */
+  public static enum Counter {
+    RECORDS_OUT
+  }
 
   /**
    * RecordWriter.
@@ -249,7 +258,7 @@ public class FileSinkOperator extends Te
   private static final long serialVersionUID = 1L;
   protected transient FileSystem fs;
   protected transient Serializer serializer;
-  protected transient LongWritable row_count;
+  protected final transient LongWritable row_count = new LongWritable();
   private transient boolean isNativeTable = true;
 
   /**
@@ -352,7 +361,7 @@ public class FileSinkOperator extends Te
         prtner = (HivePartitioner<HiveKey, Object>) ReflectionUtils.newInstance(
             jc.getPartitionerClass(), null);
       }
-      row_count = new LongWritable();
+
       if (dpCtx != null) {
         dpSetup();
       }
@@ -381,6 +390,15 @@ public class FileSinkOperator extends Te
         bucketField = recIdInspector.getAllStructFieldRefs().get(1);
         bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
       }
+
+      numRows = 0;
+
+      String context = jc.get(Operator.CONTEXT_NAME_KEY, "");
+      if (context != null && !context.isEmpty()) {
+        context = "_" + context.replace(" ","_");
+      }
+      statsMap.put(Counter.RECORDS_OUT + context, row_count);
+
       initializeChildren(hconf);
     } catch (HiveException e) {
       throw e;
@@ -657,9 +675,9 @@ public class FileSinkOperator extends Te
         fpaths.stat.addToStat(StatsSetupConst.ROW_COUNT, 1);
       }
 
-
-      if (row_count != null) {
-        row_count.set(row_count.get() + 1);
+      if (++numRows == cntr) {
+        cntr *= 10;
+        LOG.info(toString() + ": records written - " + numRows);
       }
 
       int writerOffset = findWriterOffset(row);
@@ -921,6 +939,9 @@ public class FileSinkOperator extends Te
   @Override
   public void closeOp(boolean abort) throws HiveException {
 
+    row_count.set(numRows);
+    LOG.info(toString() + ": records written - " + numRows);    
+
     if (!bDynParts && !filesCreated) {
       createBucketFiles(fsp);
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Thu Oct 30 16:22:33 2014
@@ -37,29 +37,14 @@ public class FilterOperator extends Oper
     Serializable {
 
   private static final long serialVersionUID = 1L;
-
-  /**
-   * Counter.
-   *
-   */
-  public static enum Counter {
-    FILTERED, PASSED
-  }
-
-  protected final transient LongWritable filtered_count;
-  protected final transient LongWritable passed_count;
   private transient ExprNodeEvaluator conditionEvaluator;
   private transient PrimitiveObjectInspector conditionInspector;
-  private transient int consecutiveFails;
   private transient int consecutiveSearches;
   private transient IOContext ioContext;
   protected transient int heartbeatInterval;
 
   public FilterOperator() {
     super();
-    filtered_count = new LongWritable();
-    passed_count = new LongWritable();
-    consecutiveFails = 0;
     consecutiveSearches = 0;
   }
 
@@ -73,8 +58,6 @@ public class FilterOperator extends Oper
         conditionEvaluator = ExprNodeEvaluatorFactory.toCachedEval(conditionEvaluator);
       }
 
-      statsMap.put(Counter.FILTERED, filtered_count);
-      statsMap.put(Counter.PASSED, passed_count);
       conditionInspector = null;
       ioContext = IOContext.get(hconf);
     } catch (Throwable e) {
@@ -135,17 +118,6 @@ public class FilterOperator extends Oper
         .getPrimitiveJavaObject(condition);
     if (Boolean.TRUE.equals(ret)) {
       forward(row, rowInspector);
-      passed_count.set(passed_count.get() + 1);
-      consecutiveFails = 0;
-    } else {
-      filtered_count.set(filtered_count.get() + 1);
-      consecutiveFails++;
-
-      // In case of a lot of consecutive failures, send a heartbeat in order to
-      // avoid timeout
-      if (((consecutiveFails % heartbeatInterval) == 0) && (reporter != null)) {
-        reporter.progress();
-      }
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Thu Oct 30 16:22:33 2014
@@ -500,7 +500,7 @@ public final class FunctionRegistry {
       Class<? extends GenericUDF> genericUDFClass) {
     if (GenericUDF.class.isAssignableFrom(genericUDFClass)) {
       FunctionInfo fI = new FunctionInfo(isNative, functionName,
-          (GenericUDF) ReflectionUtils.newInstance(genericUDFClass, null));
+          ReflectionUtils.newInstance(genericUDFClass, null));
       mFunctions.put(functionName.toLowerCase(), fI);
       registerNativeStatus(fI);
     } else {
@@ -523,7 +523,7 @@ public final class FunctionRegistry {
       Class<? extends GenericUDTF> genericUDTFClass) {
     if (GenericUDTF.class.isAssignableFrom(genericUDTFClass)) {
       FunctionInfo fI = new FunctionInfo(isNative, functionName,
-          (GenericUDTF) ReflectionUtils.newInstance(genericUDTFClass, null));
+          ReflectionUtils.newInstance(genericUDTFClass, null));
       mFunctions.put(functionName.toLowerCase(), fI);
       registerNativeStatus(fI);
     } else {
@@ -534,7 +534,7 @@ public final class FunctionRegistry {
 
   private static FunctionInfo getFunctionInfoFromMetastore(String functionName) {
     FunctionInfo ret = null;
-  
+
     try {
       String dbName;
       String fName;
@@ -577,7 +577,7 @@ public final class FunctionRegistry {
       // Lookup of UDf class failed
       LOG.error("Unable to load UDF class: " + e);
     }
-  
+
     return ret;
   }
 
@@ -599,7 +599,7 @@ public final class FunctionRegistry {
     if (functionInfo != null) {
       loadFunctionResourcesIfNecessary(functionName, functionInfo);
     }
-    
+
     return functionInfo;
   }
 
@@ -732,6 +732,34 @@ public final class FunctionRegistry {
   }
 
   /**
+   * Returns a set of registered function names which matchs the given pattern.
+   * This is used for the CLI command "SHOW FUNCTIONS LIKE 'regular expression';"
+   *
+   * @param funcPatternStr
+   *          regular expression of the interested function names
+   * @return set of strings contains function names
+   */
+  public static Set<String> getFunctionNamesByLikePattern(String funcPatternStr) {
+    Set<String> funcNames = new TreeSet<String>();
+    Set<String> allFuncs = getFunctionNames(true);
+    String[] subpatterns = funcPatternStr.trim().split("\\|");
+    for (String subpattern : subpatterns) {
+      subpattern = "(?i)" + subpattern.replaceAll("\\*", ".*");
+      try {
+        Pattern patternObj = Pattern.compile(subpattern);
+        for (String funcName : allFuncs) {
+          if (patternObj.matcher(funcName).matches()) {
+            funcNames.add(funcName);
+          }
+        }
+      } catch (PatternSyntaxException e) {
+        continue;
+      }
+    }
+    return funcNames;
+  }
+
+  /**
    * Returns the set of synonyms of the supplied function.
    *
    * @param funcName
@@ -954,6 +982,12 @@ public final class FunctionRegistry {
           (PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b,PrimitiveCategory.STRING);
     }
 
+    // Another special case, because timestamp is not implicitly convertible to numeric types.
+    if ((pgA == PrimitiveGrouping.NUMERIC_GROUP || pgB == PrimitiveGrouping.NUMERIC_GROUP)
+        && (pcA == PrimitiveCategory.TIMESTAMP || pcB == PrimitiveCategory.TIMESTAMP)) {
+      return TypeInfoFactory.doubleTypeInfo;
+    }
+
     for (PrimitiveCategory t : numericTypeList) {
       if (FunctionRegistry.implicitConvertible(pcA, t)
           && FunctionRegistry.implicitConvertible(pcB, t)) {
@@ -984,7 +1018,7 @@ public final class FunctionRegistry {
       // If either is not a numeric type, return null.
       return null;
     }
-    
+
     return (ai > bi) ? pcA : pcB;
   }
 
@@ -1189,7 +1223,7 @@ public final class FunctionRegistry {
       Class<? extends UDAF> udafClass) {
     FunctionInfo fi = new FunctionInfo(isNative,
         functionName.toLowerCase(), new GenericUDAFBridge(
-        (UDAF) ReflectionUtils.newInstance(udafClass, null)));
+        ReflectionUtils.newInstance(udafClass, null)));
     mFunctions.put(functionName.toLowerCase(), fi);
 
     // All aggregate functions should also be usable as window functions
@@ -1537,7 +1571,7 @@ public final class FunctionRegistry {
       clonedUDF = new GenericUDFMacro(bridge.getMacroName(), bridge.getBody(),
           bridge.getColNames(), bridge.getColTypes());
     } else {
-      clonedUDF = (GenericUDF) ReflectionUtils
+      clonedUDF = ReflectionUtils
           .newInstance(genericUDF.getClass(), null);
     }
 
@@ -1576,7 +1610,7 @@ public final class FunctionRegistry {
     if (null == genericUDTF) {
       return null;
     }
-    return (GenericUDTF) ReflectionUtils.newInstance(genericUDTF.getClass(),
+    return ReflectionUtils.newInstance(genericUDTF.getClass(),
         null);
   }
 
@@ -1701,17 +1735,17 @@ public final class FunctionRegistry {
   /**
    * Returns whether the exprNodeDesc is node of "cast".
    */
-  private static boolean isOpCast(ExprNodeDesc desc) {
+  public static boolean isOpCast(ExprNodeDesc desc) {
     if (!(desc instanceof ExprNodeGenericFuncDesc)) {
       return false;
     }
-    GenericUDF genericUDF = ((ExprNodeGenericFuncDesc)desc).getGenericUDF();
-    Class udfClass;
-    if (genericUDF instanceof GenericUDFBridge) {
-      udfClass = ((GenericUDFBridge)genericUDF).getUdfClass();
-    } else {
-      udfClass = genericUDF.getClass();
-    }
+    return isOpCast(((ExprNodeGenericFuncDesc)desc).getGenericUDF());
+  }
+
+  public static boolean isOpCast(GenericUDF genericUDF) {
+    Class udfClass = (genericUDF instanceof GenericUDFBridge) ?
+        ((GenericUDFBridge)genericUDF).getUdfClass() : genericUDF.getClass();
+
     return udfClass == UDFToBoolean.class || udfClass == UDFToByte.class ||
         udfClass == UDFToDouble.class || udfClass == UDFToFloat.class ||
         udfClass == UDFToInteger.class || udfClass == UDFToLong.class ||
@@ -1934,7 +1968,7 @@ public final class FunctionRegistry {
   {
     return getTableFunctionResolver(WINDOWING_TABLE_FUNCTION);
   }
-  
+
   public static boolean isNoopFunction(String fnName) {
     fnName = fnName.toLowerCase();
     return fnName.equals(NOOP_MAP_TABLE_FUNCTION) ||
@@ -1958,17 +1992,18 @@ public final class FunctionRegistry {
    * @return true if function is a UDAF, has WindowFunctionDescription annotation and the annotations
    *         confirms a ranking function, false otherwise
    */
-  public static boolean isRankingFunction(String name){
+  public static boolean isRankingFunction(String name) {
     FunctionInfo info = getFunctionInfo(name);
+    if (info == null) {
+      return false;
+    }
     GenericUDAFResolver res = info.getGenericUDAFResolver();
-    if (res != null){
-      WindowFunctionDescription desc =
-          AnnotationUtils.getAnnotation(res.getClass(), WindowFunctionDescription.class);
-      if (desc != null){
-        return desc.rankingFunction();
-      }
+    if (res == null) {
+      return false;
     }
-    return false;
+    WindowFunctionDescription desc =
+        AnnotationUtils.getAnnotation(res.getClass(), WindowFunctionDescription.class);
+    return (desc != null) && desc.rankingFunction();
   }
 
   /**

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Thu Oct 30 16:22:33 2014
@@ -63,7 +63,7 @@ public class JoinOperator extends Common
       skewJoinKeyContext.initiliaze(hconf);
       skewJoinKeyContext.setSkewJoinJobCounter(skewjoin_followup_jobs);
     }
-    statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS, skewjoin_followup_jobs);
+    statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS.toString(), skewjoin_followup_jobs);
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Thu Oct 30 16:22:33 2014
@@ -171,9 +171,14 @@ public class MapJoinOperator extends Abs
 
   private void loadHashTable() throws HiveException {
 
-    if ((this.getExecContext() != null)
-        && ((this.getExecContext().getLocalWork() == null) || (!this.getExecContext()
-            .getLocalWork().getInputFileChangeSensitive()))) {
+    if ((this.getExecContext() == null)
+        || (this.getExecContext().getLocalWork() == null)
+        || (this.getExecContext().getLocalWork().getInputFileChangeSensitive() == false)
+    ) {
+      /*
+       * This early-exit criteria is not applicable if the local work is sensitive to input file changes.
+       * But the check does no apply if there is no local work, or if this is a reducer vertex (execContext is null).
+       */
       if (hashTblInitedOnce) {
         return;
       } else {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Thu Oct 30 16:22:33 2014
@@ -23,20 +23,20 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -66,6 +67,7 @@ import org.apache.hadoop.util.StringUtil
  * different from regular operators in that it starts off by processing a
  * Writable data structure from a Table (instead of a Hive Object).
  **/
+@SuppressWarnings("deprecation")
 public class MapOperator extends Operator<MapWork> implements Serializable, Cloneable {
 
   private static final long serialVersionUID = 1L;
@@ -75,60 +77,32 @@ public class MapOperator extends Operato
    *
    */
   public static enum Counter {
-    DESERIALIZE_ERRORS
+    DESERIALIZE_ERRORS,
+    RECORDS_IN
   }
 
   private final transient LongWritable deserialize_error_count = new LongWritable();
-
-  private final Map<MapInputPath, MapOpCtx> opCtxMap = new HashMap<MapInputPath, MapOpCtx>();
-  private final Map<Operator<? extends OperatorDesc>, MapOpCtx> childrenOpToOpCtxMap =
-    new HashMap<Operator<? extends OperatorDesc>, MapOpCtx>();
-
-  protected transient MapOpCtx current;
-  private transient List<Operator<? extends OperatorDesc>> extraChildrenToClose = null;
-  private final Map<String, Path> normalizedPaths = new HashMap<String, Path>();
-
-  private static class MapInputPath {
-    String path;
-    String alias;
-    Operator<?> op;
-    PartitionDesc partDesc;
-
-    /**
-     * @param path
-     * @param alias
-     * @param op
-     */
-    public MapInputPath(String path, String alias, Operator<?> op, PartitionDesc partDesc) {
-      this.path = path;
-      this.alias = alias;
-      this.op = op;
-      this.partDesc = partDesc;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof MapInputPath) {
-        MapInputPath mObj = (MapInputPath) o;
-        return path.equals(mObj.path) && alias.equals(mObj.alias)
-            && op.equals(mObj.op);
-      }
-
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      int ret = (path == null) ? 0 : path.hashCode();
-      ret += (alias == null) ? 0 : alias.hashCode();
-      ret += (op == null) ? 0 : op.hashCode();
-      return ret;
-    }
-  }
+  private final transient LongWritable recordCounter = new LongWritable();
+  protected transient long numRows = 0;
+  protected transient long cntr = 1;
+
+  // input path --> {operator --> context}
+  private final Map<String, Map<Operator<?>, MapOpCtx>> opCtxMap =
+      new HashMap<String, Map<Operator<?>, MapOpCtx>>();
+  // child operator --> object inspector (converted OI if it's needed)
+  private final Map<Operator<?>, StructObjectInspector> childrenOpToOI =
+      new HashMap<Operator<?>, StructObjectInspector>();
+
+  // context for current input file
+  protected transient MapOpCtx[] currentCtxs;
+  private transient final Map<String, Path> normalizedPaths = new HashMap<String, Path>();
 
   protected static class MapOpCtx {
 
-    StructObjectInspector tblRawRowObjectInspector;  // columns
+    final String alias;
+    final Operator<?> op;
+    final PartitionDesc partDesc;
+
     StructObjectInspector partObjectInspector;    // partition columns
     StructObjectInspector vcsObjectInspector;     // virtual columns
     StructObjectInspector rowObjectInspector;
@@ -144,6 +118,12 @@ public class MapOperator extends Operato
     List<VirtualColumn> vcs;
     Object[] vcValues;
 
+    public MapOpCtx(String alias, Operator<?> op, PartitionDesc partDesc) {
+      this.alias = alias;
+      this.op = op;
+      this.partDesc = partDesc;
+    }
+
     private boolean isPartitioned() {
       return partObjectInspector != null;
     }
@@ -152,12 +132,30 @@ public class MapOperator extends Operato
       return vcsObjectInspector != null;
     }
 
-    private Object readRow(Writable value) throws SerDeException {
-      return partTblObjectInspectorConverter.convert(deserializer.deserialize(value));
+    private Object readRow(Writable value, ExecMapperContext context) throws SerDeException {
+      Object deserialized = deserializer.deserialize(value);
+      Object row = partTblObjectInspectorConverter.convert(deserialized);
+      if (hasVC()) {
+        rowWithPartAndVC[0] = row;
+        if (context != null) {
+          populateVirtualColumnValues(context, vcs, vcValues, deserializer);
+        }
+        int vcPos = isPartitioned() ? 2 : 1;
+        rowWithPartAndVC[vcPos] = vcValues;
+        return  rowWithPartAndVC;
+      } else if (isPartitioned()) {
+        rowWithPart[0] = row;
+        return rowWithPart;
+      }
+      return row;
     }
 
-    public StructObjectInspector getRowObjectInspector() {
-      return rowObjectInspector;
+    public boolean forward(Object row) throws HiveException {
+      if (op.getDone()) {
+        return false;
+      }
+      op.processOp(row, 0);
+      return true;
     }
   }
 
@@ -170,20 +168,19 @@ public class MapOperator extends Operato
    * @param mapWork
    * @throws HiveException
    */
-  public void initializeAsRoot(Configuration hconf, MapWork mapWork)
-      throws HiveException {
+  @VisibleForTesting
+  void initializeAsRoot(JobConf hconf, MapWork mapWork) throws Exception {
     setConf(mapWork);
     setChildren(hconf);
+    setExecContext(new ExecMapperContext(hconf));
     initialize(hconf, null);
   }
 
-  private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx,
-      Map<TableDesc, StructObjectInspector> convertedOI) throws Exception {
-
-    PartitionDesc pd = ctx.partDesc;
+  private MapOpCtx initObjectInspector(Configuration hconf, MapOpCtx opCtx,
+      StructObjectInspector tableRowOI) throws Exception {
+    PartitionDesc pd = opCtx.partDesc;
     TableDesc td = pd.getTableDesc();
 
-    MapOpCtx opCtx = new MapOpCtx();
     // Use table properties in case of unpartitioned tables,
     // and the union of table properties and partition properties, with partition
     // taking precedence, in the case of partitioned tables
@@ -194,18 +191,13 @@ public class MapOperator extends Operato
 
     opCtx.tableName = String.valueOf(overlayedProps.getProperty("name"));
     opCtx.partName = String.valueOf(partSpec);
-
-    Class serdeclass = hconf.getClassByName(pd.getSerdeClassName());
-    opCtx.deserializer = (Deserializer) serdeclass.newInstance();
-    SerDeUtils.initializeSerDe(opCtx.deserializer, hconf, td.getProperties(), pd.getProperties());
+    opCtx.deserializer = pd.getDeserializer(hconf);
 
     StructObjectInspector partRawRowObjectInspector =
         (StructObjectInspector) opCtx.deserializer.getObjectInspector();
 
-    opCtx.tblRawRowObjectInspector = convertedOI.get(td);
-
-    opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
-        partRawRowObjectInspector, opCtx.tblRawRowObjectInspector);
+    opCtx.partTblObjectInspectorConverter =
+        ObjectInspectorConverters.getConverter(partRawRowObjectInspector, tableRowOI);
 
     // Next check if this table has partitions and if so
     // get the list of partition names as well as allocate
@@ -253,8 +245,8 @@ public class MapOperator extends Operato
     // The op may not be a TableScan for mapjoins
     // Consider the query: select /*+MAPJOIN(a)*/ count(*) FROM T1 a JOIN T2 b ON a.key = b.key;
     // In that case, it will be a Select, but the rowOI need not be amended
-    if (ctx.op instanceof TableScanOperator) {
-      TableScanOperator tsOp = (TableScanOperator) ctx.op;
+    if (opCtx.op instanceof TableScanOperator) {
+      TableScanOperator tsOp = (TableScanOperator) opCtx.op;
       TableScanDesc tsDesc = tsOp.getConf();
       if (tsDesc != null && tsDesc.hasVirtualCols()) {
         opCtx.vcs = tsDesc.getVirtualCols();
@@ -268,11 +260,11 @@ public class MapOperator extends Operato
       }
     }
     if (!opCtx.hasVC() && !opCtx.isPartitioned()) {
-      opCtx.rowObjectInspector = opCtx.tblRawRowObjectInspector;
+      opCtx.rowObjectInspector = tableRowOI;
       return opCtx;
     }
     List<StructObjectInspector> inspectors = new ArrayList<StructObjectInspector>();
-    inspectors.add(opCtx.tblRawRowObjectInspector);
+    inspectors.add(tableRowOI);
     if (opCtx.isPartitioned()) {
       inspectors.add(opCtx.partObjectInspector);
     }
@@ -302,19 +294,14 @@ public class MapOperator extends Operato
       for (String onefile : conf.getPathToAliases().keySet()) {
         PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
         TableDesc tableDesc = pd.getTableDesc();
-        Properties tblProps = tableDesc.getProperties();
-        Class sdclass = hconf.getClassByName(pd.getSerdeClassName());
-        Deserializer partDeserializer = (Deserializer) sdclass.newInstance();
-        SerDeUtils.initializeSerDe(partDeserializer, hconf, tblProps, pd.getProperties());
-        StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer
-            .getObjectInspector();
+        Deserializer partDeserializer = pd.getDeserializer(hconf);
+        StructObjectInspector partRawRowObjectInspector =
+            (StructObjectInspector) partDeserializer.getObjectInspector();
 
         StructObjectInspector tblRawRowObjectInspector = tableDescOI.get(tableDesc);
         if ((tblRawRowObjectInspector == null) ||
             (identityConverterTableDesc.contains(tableDesc))) {
-            sdclass = hconf.getClassByName(tableDesc.getSerdeClassName());
-            Deserializer tblDeserializer = (Deserializer) sdclass.newInstance();
-            SerDeUtils.initializeSerDe(tblDeserializer, hconf, tblProps, null);
+          Deserializer tblDeserializer = tableDesc.getDeserializer(hconf);
           tblRawRowObjectInspector =
               (StructObjectInspector) ObjectInspectorConverters.getConvertedOI(
                   partRawRowObjectInspector,
@@ -338,155 +325,153 @@ public class MapOperator extends Operato
     return tableDescOI;
   }
 
-  public void setChildren(Configuration hconf) throws HiveException {
-    Path fpath = IOContext.get(hconf).getInputPath();
-
-    boolean schemeless = fpath.toUri().getScheme() == null;
+  public void setChildren(Configuration hconf) throws Exception {
 
     List<Operator<? extends OperatorDesc>> children =
         new ArrayList<Operator<? extends OperatorDesc>>();
 
     Map<TableDesc, StructObjectInspector> convertedOI = getConvertedOI(hconf);
 
-    try {
-      for (Map.Entry<String, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) {
-        String onefile = entry.getKey();
-        List<String> aliases = entry.getValue();
-
-        Path onepath = new Path(onefile);
-        if (schemeless) {
-          onepath = new Path(onepath.toUri().getPath());
-        }
-
-        PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
-
-        for (String onealias : aliases) {
-          Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(onealias);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Adding alias " + onealias + " to work list for file "
-               + onefile);
-          }
-          MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc);
-          if (opCtxMap.containsKey(inp)) {
-            continue;
-          }
-          MapOpCtx opCtx = initObjectInspector(hconf, inp, convertedOI);
-          opCtxMap.put(inp, opCtx);
+    for (Map.Entry<String, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) {
+      String onefile = entry.getKey();
+      List<String> aliases = entry.getValue();
+      PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
+
+      for (String alias : aliases) {
+        Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(alias);
+        if (isLogDebugEnabled) {
+          LOG.debug("Adding alias " + alias + " to work list for file "
+              + onefile);
+        }
+        Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(onefile);
+        if (contexts == null) {
+          opCtxMap.put(onefile, contexts = new LinkedHashMap<Operator<?>, MapOpCtx>());
+        }
+        if (contexts.containsKey(op)) {
+          continue;
+        }
+        MapOpCtx context = new MapOpCtx(alias, op, partDesc);
+        StructObjectInspector tableRowOI = convertedOI.get(partDesc.getTableDesc());
+        contexts.put(op, initObjectInspector(hconf, context, tableRowOI));
 
-          op.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+        if (children.contains(op) == false) {
+          op.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>(1));
           op.getParentOperators().add(this);
-          // check for the operators who will process rows coming to this Map
-          // Operator
-          if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
-            children.add(op);
-            childrenOpToOpCtxMap.put(op, opCtx);
-            LOG.info("dump " + op + " "
-                + opCtxMap.get(inp).rowObjectInspector.getTypeName());
-          }
-          current = opCtx;  // just need for TestOperators.testMapOperator
+          children.add(op);
         }
       }
+    }
+
+    initOperatorContext(children);
 
-      if (children.size() == 0) {
-        // didn't find match for input file path in configuration!
-        // serious problem ..
-        LOG.error("Configuration does not have any alias for path: "
-            + fpath.toUri());
-        throw new HiveException("Configuration and input path are inconsistent");
+    // we found all the operators that we are supposed to process.
+    setChildOperators(children);
+  }
+
+  private void initOperatorContext(List<Operator<? extends OperatorDesc>> children)
+      throws HiveException {
+    for (Map<Operator<?>, MapOpCtx> contexts : opCtxMap.values()) {
+      for (MapOpCtx context : contexts.values()) {
+        if (!children.contains(context.op)) {
+          continue;
+        }
+        StructObjectInspector prev =
+            childrenOpToOI.put(context.op, context.rowObjectInspector);
+        if (prev != null && !prev.equals(context.rowObjectInspector)) {
+          throw new HiveException("Conflict on row inspector for " + context.alias);
+        }
+        if (isLogDebugEnabled) {
+          LOG.debug("dump " + context.op + " " + context.rowObjectInspector.getTypeName());
+        }
       }
+    }
+  }
 
-      // we found all the operators that we are supposed to process.
-      setChildOperators(children);
-    } catch (Exception e) {
-      throw new HiveException(e);
+  private String getNominalPath(Path fpath) {
+    String nominal = null;
+    boolean schemaless = fpath.toUri().getScheme() == null;
+    for (String onefile : conf.getPathToAliases().keySet()) {
+      Path onepath = normalizePath(onefile, schemaless);
+      // check for the operators who will process rows coming to this Map Operator
+      if (onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
+        // not from this
+        continue;
+      }
+      if (nominal != null) {
+        throw new IllegalStateException("Ambiguous input path " + fpath);
+      }
+      nominal = onefile;
+    }
+    if (nominal == null) {
+      throw new IllegalStateException("Invalid input path " + fpath);
     }
+    return nominal;
   }
 
   @Override
   public void initializeOp(Configuration hconf) throws HiveException {
     // set that parent initialization is done and call initialize on children
     state = State.INIT;
-    statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
+    statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
 
-    List<Operator<? extends OperatorDesc>> children = getChildOperators();
+    numRows = 0;
 
-    for (Entry<Operator<? extends OperatorDesc>, MapOpCtx> entry : childrenOpToOpCtxMap
-        .entrySet()) {
-      Operator<? extends OperatorDesc> child = entry.getKey();
-      MapOpCtx mapOpCtx = entry.getValue();
-      // Add alias, table name, and partitions to hadoop conf so that their
-      // children will inherit these
-      HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, mapOpCtx.tableName);
-      HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, mapOpCtx.partName);
-      child.initialize(hconf, new ObjectInspector[] {mapOpCtx.rowObjectInspector});
-    }
-
-    for (Entry<MapInputPath, MapOpCtx> entry : opCtxMap.entrySet()) {
-      MapInputPath input = entry.getKey();
-      MapOpCtx mapOpCtx = entry.getValue();
-      // Add alias, table name, and partitions to hadoop conf so that their
-      // children will inherit these
-      HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, mapOpCtx.tableName);
-      HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, mapOpCtx.partName);
-
-      Operator<? extends OperatorDesc> op = input.op;
-      if (children.indexOf(op) == -1) {
-        // op is not in the children list, so need to remember it and close it afterwards
-        if (extraChildrenToClose == null) {
-          extraChildrenToClose = new ArrayList<Operator<? extends OperatorDesc>>();
-        }
-        extraChildrenToClose.add(op);
-        op.initialize(hconf, new ObjectInspector[] {entry.getValue().rowObjectInspector});
-      }
+    String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+    if (context != null && !context.isEmpty()) {
+      context = "_" + context.replace(" ","_");
+    }
+    statsMap.put(Counter.RECORDS_IN + context, recordCounter);
+
+    for (Entry<Operator<?>, StructObjectInspector> entry : childrenOpToOI.entrySet()) {
+      Operator<?> child = entry.getKey();
+      child.initialize(hconf, new ObjectInspector[] {entry.getValue()});
     }
   }
 
-  /**
-   * close extra child operators that are initialized but are not executed.
-   */
   @Override
   public void closeOp(boolean abort) throws HiveException {
-    if (extraChildrenToClose != null) {
-      for (Operator<? extends OperatorDesc> op : extraChildrenToClose) {
-        op.close(abort);
-      }
-    }
+    recordCounter.set(numRows);
+    super.closeOp(abort);
   }
 
   // Find context for current input file
   @Override
   public void cleanUpInputFileChangedOp() throws HiveException {
+    super.cleanUpInputFileChangedOp();
     Path fpath = getExecContext().getCurrentInputPath();
-
-    for (String onefile : conf.getPathToAliases().keySet()) {
-      Path onepath = normalizePath(onefile);
-      // check for the operators who will process rows coming to this Map
-      // Operator
-      if (onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
-        // not from this
-        continue;
-      }
-      PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
-      for (String onealias : conf.getPathToAliases().get(onefile)) {
-        Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(onealias);
-        MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc);
-        MapOpCtx context = opCtxMap.get(inp);
-        if (context != null) {
-          current = context;
-          LOG.info("Processing alias " + onealias + " for file " + onefile);
-          return;
+    String nominalPath = getNominalPath(fpath);
+    Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(nominalPath);
+    if (isLogInfoEnabled) {
+      StringBuilder builder = new StringBuilder();
+      for (MapOpCtx context : contexts.values()) {
+        if (builder.length() > 0) {
+          builder.append(", ");
         }
+        builder.append(context.alias);
+      }
+      if (isLogDebugEnabled) {
+        LOG.info("Processing alias(es) " + builder.toString() + " for file " + fpath);
       }
     }
-    throw new IllegalStateException("Invalid path " + fpath);
+    // Add alias, table name, and partitions to hadoop conf so that their
+    // children will inherit these
+    for (Entry<Operator<?>, MapOpCtx> entry : contexts.entrySet()) {
+      Operator<?> operator = entry.getKey();
+      MapOpCtx context = entry.getValue();
+      operator.setInputContext(nominalPath, context.tableName, context.partName);
+    }
+    currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]);
   }
 
-  private Path normalizePath(String onefile) {
+  private Path normalizePath(String onefile, boolean schemaless) {
     //creating Path is expensive, so cache the corresponding
     //Path object in normalizedPaths
     Path path = normalizedPaths.get(onefile);
-    if(path == null){
+    if (path == null) {
       path = new Path(onefile);
+      if (schemaless && path.toUri().getScheme() != null) {
+        path = new Path(path.toUri().getPath());
+      }
       normalizedPaths.put(onefile, path);
     }
     return path;
@@ -500,50 +485,48 @@ public class MapOperator extends Operato
       // The child operators cleanup if input file has changed
       cleanUpInputFileChanged();
     }
-    Object row;
-    try {
-      row = current.readRow(value);
-      if (current.hasVC()) {
-        current.rowWithPartAndVC[0] = row;
-        if (context != null) {
-          populateVirtualColumnValues(context, current.vcs, current.vcValues, current.deserializer);
-        }
-        int vcPos = current.isPartitioned() ? 2 : 1;
-        current.rowWithPartAndVC[vcPos] = current.vcValues;
-        row = current.rowWithPartAndVC;
-      } else if (current.isPartitioned()) {
-        current.rowWithPart[0] = row;
-        row = current.rowWithPart;
-      }
-    } catch (Exception e) {
-      // Serialize the row and output.
-      String rawRowString;
+    int childrenDone = 0;
+    for (MapOpCtx current : currentCtxs) {
+      Object row = null;
       try {
-        rawRowString = value.toString();
-      } catch (Exception e2) {
-        rawRowString = "[Error getting row data with exception " +
-            StringUtils.stringifyException(e2) + " ]";
+        row = current.readRow(value, context);
+        if (!current.forward(row)) {
+          childrenDone++;
+        }
+      } catch (Exception e) {
+        // TODO: policy on deserialization errors
+        String message = toErrorMessage(value, row, current.rowObjectInspector);
+        if (row == null) {
+          deserialize_error_count.set(deserialize_error_count.get() + 1);
+          throw new HiveException("Hive Runtime Error while processing writable " + message, e);
+        }
+        throw new HiveException("Hive Runtime Error while processing row " + message, e);
       }
+    }
+    rowsForwarded(childrenDone, 1);
+  }
 
-      // TODO: policy on deserialization errors
-      deserialize_error_count.set(deserialize_error_count.get() + 1);
-      throw new HiveException("Hive Runtime Error while processing writable " + rawRowString, e);
+  protected final void rowsForwarded(int childrenDone, int rows) {
+    numRows += rows;
+    if (isLogInfoEnabled) {
+      while (numRows >= cntr) {
+        cntr *= 10;
+        LOG.info(toString() + ": records read - " + numRows);
+      }
     }
+    if (childrenDone == currentCtxs.length) {
+      setDone(true);
+    }
+  }
 
-    // The row has been converted to comply with table schema, irrespective of partition schema.
-    // So, use tblOI (and not partOI) for forwarding
+  private String toErrorMessage(Writable value, Object row, ObjectInspector inspector) {
     try {
-      forward(row, current.rowObjectInspector);
-    } catch (Exception e) {
-      // Serialize the row and output the error message.
-      String rowString;
-      try {
-        rowString = SerDeUtils.getJSONString(row, current.rowObjectInspector);
-      } catch (Exception e2) {
-        rowString = "[Error getting row data with exception " +
-            StringUtils.stringifyException(e2) + " ]";
+      if (row != null) {
+        return SerDeUtils.getJSONString(row, inspector);
       }
-      throw new HiveException("Hive Runtime Error while processing row " + rowString, e);
+      return String.valueOf(value);
+    } catch (Exception e) {
+      return "[Error getting row data with exception " + StringUtils.stringifyException(e) + " ]";
     }
   }
 
@@ -639,4 +622,16 @@ public class MapOperator extends Operato
   public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
     return MapRecordProcessor.getConnectOps();
   }
+
+  public void initializeContexts() {
+    Path fpath = getExecContext().getCurrentInputPath();
+    String nominalPath = getNominalPath(fpath);
+    Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(nominalPath);
+    currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]);
+  }
+
+  public Deserializer getCurrentDeserializer() {
+
+    return currentCtxs[0].deserializer;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Oct 30 16:22:33 2014
@@ -61,6 +61,7 @@ public abstract class Operator<T extends
 
   public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES";
   public static final String HIVECOUNTERFATAL = "FATAL_ERROR";
+  public static final String CONTEXT_NAME_KEY = "__hive.context.name";
 
   private transient Configuration configuration;
   protected List<Operator<? extends OperatorDesc>> childOperators;
@@ -210,11 +211,14 @@ public abstract class Operator<T extends
 
   // non-bean ..
 
-  protected transient HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable>();
+  protected transient Map<String, LongWritable> statsMap = new HashMap<String, LongWritable>();
   @SuppressWarnings("rawtypes")
   protected transient OutputCollector out;
-  protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
-  protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled();
+  protected transient final Log LOG = LogFactory.getLog(getClass().getName());
+  protected transient final Log PLOG = LogFactory.getLog(Operator.class.getName()); // for simple disabling logs from all operators
+  protected transient final boolean isLogInfoEnabled = LOG.isInfoEnabled() && PLOG.isInfoEnabled();
+  protected transient final boolean isLogDebugEnabled = LOG.isDebugEnabled() && PLOG.isDebugEnabled();
+  protected transient final boolean isLogTraceEnabled = LOG.isTraceEnabled() && PLOG.isTraceEnabled();
   protected transient String alias;
   protected transient Reporter reporter;
   protected transient String id;
@@ -287,9 +291,9 @@ public abstract class Operator<T extends
     }
   }
 
-  public Map<Enum<?>, Long> getStats() {
-    HashMap<Enum<?>, Long> ret = new HashMap<Enum<?>, Long>();
-    for (Enum<?> one : statsMap.keySet()) {
+  public Map<String, Long> getStats() {
+    HashMap<String, Long> ret = new HashMap<String, Long>();
+    for (String one : statsMap.keySet()) {
       ret.put(one, Long.valueOf(statsMap.get(one).get()));
     }
     return (ret);
@@ -490,33 +494,45 @@ public abstract class Operator<T extends
   public abstract void processOp(Object row, int tag) throws HiveException;
 
   protected final void defaultStartGroup() throws HiveException {
-    LOG.debug("Starting group");
+    if (isLogDebugEnabled) {
+      LOG.debug("Starting group");
+    }
 
     if (childOperators == null) {
       return;
     }
 
-    LOG.debug("Starting group for children:");
+    if (isLogDebugEnabled) {
+      LOG.debug("Starting group for children:");
+    }
     for (Operator<? extends OperatorDesc> op : childOperators) {
       op.startGroup();
     }
 
-    LOG.debug("Start group Done");
+    if (isLogDebugEnabled) {
+      LOG.debug("Start group Done");
+    }
   }
 
   protected final void defaultEndGroup() throws HiveException {
-    LOG.debug("Ending group");
+    if (isLogDebugEnabled) {
+      LOG.debug("Ending group");
+    }
 
     if (childOperators == null) {
       return;
     }
 
-    LOG.debug("Ending group for children:");
+    if (isLogDebugEnabled) {
+      LOG.debug("Ending group for children:");
+    }
     for (Operator<? extends OperatorDesc> op : childOperators) {
       op.endGroup();
     }
 
-    LOG.debug("End group Done");
+    if (isLogDebugEnabled) {
+      LOG.debug("End group Done");
+    }
   }
 
   // If a operator wants to do some work at the beginning of a group
@@ -807,7 +823,7 @@ public abstract class Operator<T extends
   }
 
   public void resetStats() {
-    for (Enum<?> e : statsMap.keySet()) {
+    for (String e : statsMap.keySet()) {
       statsMap.get(e).set(0L);
     }
   }
@@ -840,7 +856,7 @@ public abstract class Operator<T extends
   }
 
   public void logStats() {
-    for (Enum<?> e : statsMap.keySet()) {
+    for (String e : statsMap.keySet()) {
       LOG.info(e.toString() + ":" + statsMap.get(e).toString());
     }
   }
@@ -1046,6 +1062,17 @@ public abstract class Operator<T extends
   public void cleanUpInputFileChangedOp() throws HiveException {
   }
 
+  // called by map operator. propagated recursively to single parented descendants
+  public void setInputContext(String inputPath, String tableName, String partitionName) {
+    if (childOperators != null) {
+      for (Operator<? extends OperatorDesc> child : childOperators) {
+        if (child.getNumParent() == 1) {
+          child.setInputContext(inputPath, tableName, partitionName);
+        }
+      }
+    }
+  }
+
   public boolean supportSkewJoinOptimization() {
     return false;
   }
@@ -1263,7 +1290,7 @@ public abstract class Operator<T extends
   }
 
   public void setOpTraits(OpTraits metaInfo) {
-    if (LOG.isDebugEnabled()) {
+    if (isLogDebugEnabled) {
       LOG.debug("Setting traits ("+metaInfo+") on "+this);
     }
     if (conf != null) {
@@ -1274,7 +1301,7 @@ public abstract class Operator<T extends
   }
 
   public void setStatistics(Statistics stats) {
-    if (LOG.isDebugEnabled()) {
+    if (isLogDebugEnabled) {
       LOG.debug("Setting stats ("+stats+") on "+this);
     }
     if (conf != null) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Thu Oct 30 16:22:33 2014
@@ -143,4 +143,8 @@ public class OperatorUtils {
       }
     }
   }
+
+  public static boolean sameRowSchema(Operator<?> operator1, Operator<?> operator2) {
+	  return operator1.getSchema().equals(operator2.getSchema());
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java Thu Oct 30 16:22:33 2014
@@ -64,6 +64,7 @@ public class OrcFileMergeOperator extend
 
   private void processKeyValuePairs(Object key, Object value)
       throws HiveException {
+    String filePath = "";
     try {
       OrcFileValueWrapper v;
       OrcFileKeyWrapper k;
@@ -72,6 +73,7 @@ public class OrcFileMergeOperator extend
       } else {
         k = (OrcFileKeyWrapper) key;
       }
+      filePath = k.getInputPath().toUri().getPath();
 
       fixTmpPath(k.getInputPath().getParent());
 
@@ -131,6 +133,16 @@ public class OrcFileMergeOperator extend
       this.exception = true;
       closeOp(true);
       throw new HiveException(e);
+    } finally {
+      if (fdis != null) {
+        try {
+          fdis.close();
+        } catch (IOException e) {
+          throw new HiveException(String.format("Unable to close file %s", filePath), e);
+        } finally {
+          fdis = null;
+        }
+      }
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Thu Oct 30 16:22:33 2014
@@ -337,17 +337,20 @@ public class PTFOperator extends Operato
         handleOutputRows(tabFn.finishPartition());
       } else {
         if ( tabFn.canIterateOutput() ) {
-          outputPartRowsItr = tabFn.iterator(inputPart.iterator());
+          outputPartRowsItr = inputPart == null ? null :
+            tabFn.iterator(inputPart.iterator());
         } else {
-          outputPart = tabFn.execute(inputPart);
-          outputPartRowsItr = outputPart.iterator();
+          outputPart = inputPart == null ? null : tabFn.execute(inputPart);
+          outputPartRowsItr = outputPart == null ? null : outputPart.iterator();
         }
         if ( next != null ) {
           if (!next.isStreaming() && !isOutputIterator() ) {
             next.inputPart = outputPart;
           } else {
-            while(outputPartRowsItr.hasNext() ) {
-              next.processRow(outputPartRowsItr.next());
+            if ( outputPartRowsItr != null ) {
+              while(outputPartRowsItr.hasNext() ) {
+                next.processRow(outputPartRowsItr.next());
+              }
             }
           }
         }
@@ -357,8 +360,10 @@ public class PTFOperator extends Operato
         next.finishPartition();
       } else {
         if (!isStreaming() ) {
-          while(outputPartRowsItr.hasNext() ) {
-            forward(outputPartRowsItr.next(), outputObjInspector);
+          if ( outputPartRowsItr != null ) {
+            while(outputPartRowsItr.hasNext() ) {
+              forward(outputPartRowsItr.next(), outputObjInspector);
+            }
           }
         }
       }