You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 03:21:29 UTC

svn commit: r1522098 [15/30] - in /hive/branches/vectorization: ./ beeline/src/test/org/apache/hive/beeline/src/test/ bin/ bin/ext/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/a...

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Thu Sep 12 01:21:10 2013
@@ -40,8 +40,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Schema;
@@ -58,6 +58,7 @@ import org.apache.hadoop.hive.ql.history
 import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
 import org.apache.hadoop.hive.ql.hooks.Hook;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.hooks.HookUtils;
 import org.apache.hadoop.hive.ql.hooks.PostExecute;
 import org.apache.hadoop.hive.ql.hooks.PreExecute;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -82,8 +83,8 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
-import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook;
 import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
 import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
 import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
@@ -418,25 +419,28 @@ public class Driver implements CommandPr
       ctx.setCmd(command);
       ctx.setHDFSCleanup(true);
 
+      perfLogger.PerfLogBegin(LOG, PerfLogger.PARSE);
       ParseDriver pd = new ParseDriver();
       ASTNode tree = pd.parse(command, ctx);
       tree = ParseUtils.findRootNonNullToken(tree);
+      perfLogger.PerfLogEnd(LOG, PerfLogger.PARSE);
 
+      perfLogger.PerfLogBegin(LOG, PerfLogger.ANALYZE);
       BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
-      List<AbstractSemanticAnalyzerHook> saHooks =
+      List<HiveSemanticAnalyzerHook> saHooks =
           getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK,
-                   AbstractSemanticAnalyzerHook.class);
+              HiveSemanticAnalyzerHook.class);
 
       // Do semantic analysis and plan generation
       if (saHooks != null) {
         HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
         hookCtx.setConf(conf);
-        for (AbstractSemanticAnalyzerHook hook : saHooks) {
+        for (HiveSemanticAnalyzerHook hook : saHooks) {
           tree = hook.preAnalyze(hookCtx, tree);
         }
         sem.analyze(tree, ctx);
         hookCtx.update(sem);
-        for (AbstractSemanticAnalyzerHook hook : saHooks) {
+        for (HiveSemanticAnalyzerHook hook : saHooks) {
           hook.postAnalyze(hookCtx, sem.getRootTasks());
         }
       } else {
@@ -447,6 +451,7 @@ public class Driver implements CommandPr
 
       // validate the plan
       sem.validate();
+      perfLogger.PerfLogEnd(LOG, PerfLogger.ANALYZE);
 
       plan = new QueryPlan(command, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
 
@@ -460,12 +465,12 @@ public class Driver implements CommandPr
 
         // serialize the queryPlan
         FileOutputStream fos = new FileOutputStream(queryPlanFileName);
-        Utilities.serializeObject(plan, fos);
+        Utilities.serializePlan(plan, fos, conf);
         fos.close();
 
         // deserialize the queryPlan
         FileInputStream fis = new FileInputStream(queryPlanFileName);
-        QueryPlan newPlan = Utilities.deserializeObject(fis);
+        QueryPlan newPlan = Utilities.deserializePlan(fis, QueryPlan.class, conf);
         fis.close();
 
         // Use the deserialized plan
@@ -949,7 +954,8 @@ public class Driver implements CommandPr
     // Get all the driver run hooks and pre-execute them.
     List<HiveDriverRunHook> driverRunHooks;
     try {
-      driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, HiveDriverRunHook.class);
+      driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
+          HiveDriverRunHook.class);
       for (HiveDriverRunHook driverRunHook : driverRunHooks) {
           driverRunHook.preDriverRun(hookContext);
       }
@@ -1064,19 +1070,14 @@ public class Driver implements CommandPr
 
   /**
    * Returns a set of hooks specified in a configuration variable.
-   *
    * See getHooks(HiveConf.ConfVars hookConfVar, Class<T> clazz)
-   * @param hookConfVar
-   * @return
-   * @throws Exception
    */
   private List<Hook> getHooks(HiveConf.ConfVars hookConfVar) throws Exception {
     return getHooks(hookConfVar, Hook.class);
   }
 
   /**
-   * Returns the hooks specified in a configuration variable.  The hooks are returned in a list in
-   * the order they were specified in the configuration variable.
+   * Returns the hooks specified in a configuration variable.
    *
    * @param hookConfVar The configuration variable specifying a comma separated list of the hook
    *                    class names.
@@ -1085,34 +1086,14 @@ public class Driver implements CommandPr
    *                    they are listed in the value of hookConfVar
    * @throws Exception
    */
-  private <T extends Hook> List<T> getHooks(HiveConf.ConfVars hookConfVar, Class<T> clazz)
-      throws Exception {
-
-    List<T> hooks = new ArrayList<T>();
-    String csHooks = conf.getVar(hookConfVar);
-    if (csHooks == null) {
-      return hooks;
-    }
-
-    csHooks = csHooks.trim();
-    if (csHooks.equals("")) {
-      return hooks;
-    }
-
-    String[] hookClasses = csHooks.split(",");
-
-    for (String hookClass : hookClasses) {
-      try {
-        T hook =
-            (T) Class.forName(hookClass.trim(), true, JavaUtils.getClassLoader()).newInstance();
-        hooks.add(hook);
-      } catch (ClassNotFoundException e) {
-        console.printError(hookConfVar.varname + " Class not found:" + e.getMessage());
-        throw e;
-      }
+  private <T extends Hook> List<T> getHooks(ConfVars hookConfVar,
+      Class<T> clazz) throws Exception {
+    try {
+      return HookUtils.getHooks(conf, hookConfVar, clazz);
+    } catch (ClassNotFoundException e) {
+      console.printError(hookConfVar.varname + " Class not found:" + e.getMessage());
+      throw e;
     }
-
-    return hooks;
   }
 
   public int execute() throws CommandNeedRetryException {
@@ -1202,11 +1183,13 @@ public class Driver implements CommandPr
       }
 
       perfLogger.PerfLogEnd(LOG, PerfLogger.TIME_TO_SUBMIT);
+      perfLogger.PerfLogBegin(LOG, PerfLogger.RUN_TASKS);
       // Loop while you either have tasks running, or tasks queued up
       while (running.size() != 0 || runnable.peek() != null) {
         // Launch upto maxthreads tasks
         while (runnable.peek() != null && running.size() < maxthreads) {
           Task<? extends Serializable> tsk = runnable.remove();
+          perfLogger.PerfLogBegin(LOG, PerfLogger.TASK + tsk.getName() + "." + tsk.getId());
           launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt);
         }
 
@@ -1214,6 +1197,7 @@ public class Driver implements CommandPr
         TaskResult tskRes = pollTasks(running.keySet());
         TaskRunner tskRun = running.remove(tskRes);
         Task<? extends Serializable> tsk = tskRun.getTask();
+        perfLogger.PerfLogEnd(LOG, PerfLogger.TASK + tsk.getName() + "." + tsk.getId());
         hookContext.addCompleteTask(tskRun);
 
         int exitVal = tskRes.getExitVal();
@@ -1277,6 +1261,7 @@ public class Driver implements CommandPr
           }
         }
       }
+      perfLogger.PerfLogEnd(LOG, PerfLogger.RUN_TASKS);
 
       // in case we decided to run everything in local mode, restore the
       // the jobtracker setting to its initial value

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Thu Sep 12 01:21:10 2013
@@ -144,7 +144,7 @@ public enum ErrorMsg {
   COLUMN_ALIAS_ALREADY_EXISTS(10074, "Column alias already exists:", "42S02"),
   UDTF_MULTIPLE_EXPR(10075, "Only a single expression in the SELECT clause is "
       + "supported with UDTF's"),
-  UDTF_REQUIRE_AS(10076, "UDTF's require an AS clause"),
+  @Deprecated UDTF_REQUIRE_AS(10076, "UDTF's require an AS clause"),
   UDTF_NO_GROUP_BY(10077, "GROUP BY is not supported with a UDTF in the SELECT clause"),
   UDTF_NO_SORT_BY(10078, "SORT BY is not supported with a UDTF in the SELECT clause"),
   UDTF_NO_CLUSTER_BY(10079, "CLUSTER BY is not supported with a UDTF in the SELECT clause"),
@@ -360,6 +360,8 @@ public enum ErrorMsg {
   CANNOT_REPLACE_COLUMNS(10243, "Replace columns is not supported for table {0}. SerDe may be incompatible.", true),
   BAD_LOCATION_VALUE(10244, "{0}  is not absolute or has no scheme information.  Please specify a complete absolute uri with scheme information."),
   UNSUPPORTED_ALTER_TBL_OP(10245, "{0} alter table options is not supported"),
+  INVALID_BIGTABLE_MAPJOIN(10246, "{0} table chosen for streaming is not valid", true),
+  MISSING_OVER_CLAUSE(10247, "Missing over clause for function : "),
 
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
   SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
@@ -616,8 +618,8 @@ public enum ErrorMsg {
     return format(new String[]{reason});
   }
   /**
-   * If the message is parametrized, this will fill the parameters with supplied 
-   * {@code reasons}, otherwise {@code reasons} are appended at the end of the 
+   * If the message is parametrized, this will fill the parameters with supplied
+   * {@code reasons}, otherwise {@code reasons} are appended at the end of the
    * message.
    */
   public String format(String... reasons) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Thu Sep 12 01:21:10 2013
@@ -23,8 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -54,7 +53,7 @@ public abstract class AbstractMapJoinOpe
 
   protected transient byte posBigTable = -1; // one of the tables that is not in memory
 
-  protected transient RowContainer<ArrayList<Object>> emptyList = null;
+  protected transient RowContainer<List<Object>> emptyList = null;
 
   transient int numMapRowsRead;
 
@@ -95,9 +94,9 @@ public abstract class AbstractMapJoinOpe
     // all other tables are small, and are cached in the hash table
     posBigTable = (byte) conf.getPosBigTable();
 
-    emptyList = new RowContainer<ArrayList<Object>>(1, hconf, reporter);
+    emptyList = new RowContainer<List<Object>>(1, hconf, reporter);
 
-    RowContainer bigPosRC = JoinUtil.getRowContainer(hconf,
+    RowContainer<List<Object>> bigPosRC = JoinUtil.getRowContainer(hconf,
         rowContainerStandardObjectInspectors[posBigTable],
         posBigTable, joinCacheSize,spillTableDesc, conf,
         !hasFilter(posBigTable), reporter);
@@ -160,7 +159,7 @@ public abstract class AbstractMapJoinOpe
   }
 
   // returns true if there are elements in key list and any of them is null
-  protected boolean hasAnyNulls(AbstractMapJoinKey key) {
+  protected boolean hasAnyNulls(MapJoinKey key) {
     return key.hasAnyNulls(nullsafes);
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java Thu Sep 12 01:21:10 2013
@@ -58,6 +58,8 @@ public class ColumnInfo implements Seria
 
   private boolean isHiddenVirtualCol;
 
+  private String typeName;
+
   public ColumnInfo() {
   }
 
@@ -94,6 +96,7 @@ public class ColumnInfo implements Seria
     this.tabAlias = tabAlias;
     this.isVirtualCol = isVirtualCol;
     this.isHiddenVirtualCol = isHiddenVirtualCol;
+    this.typeName = getType().getTypeName();
   }
 
   public ColumnInfo(ColumnInfo columnInfo) {
@@ -104,6 +107,15 @@ public class ColumnInfo implements Seria
     this.isVirtualCol = columnInfo.getIsVirtualCol();
     this.isHiddenVirtualCol = columnInfo.isHiddenVirtualCol();
     this.setType(columnInfo.getType());
+    this.typeName = columnInfo.getType().getTypeName();
+  }
+
+  public String getTypeName() {
+    return this.typeName;
+  }
+
+  public void setTypeName(String typeName) {
+    this.typeName = typeName;
   }
 
   public TypeInfo getType() {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Thu Sep 12 01:21:10 2013
@@ -93,7 +93,7 @@ public abstract class CommonJoinOperator
   protected transient ArrayList<Object>[] dummyObj;
 
   // empty rows for each table
-  protected transient RowContainer<ArrayList<Object>>[] dummyObjVectors;
+  protected transient RowContainer<List<Object>>[] dummyObjVectors;
 
   protected transient int totalSz; // total size of the composite object
 
@@ -108,7 +108,7 @@ public abstract class CommonJoinOperator
   // input is too large
   // to fit in memory
 
-  AbstractRowContainer<ArrayList<Object>>[] storage; // map b/w table alias
+  AbstractRowContainer<List<Object>>[] storage; // map b/w table alias
   // to RowContainer
   int joinEmitInterval = -1;
   int joinCacheSize = 0;
@@ -274,7 +274,7 @@ public abstract class CommonJoinOperator
       }
       dummyObj[pos] = nr;
       // there should be only 1 dummy object in the RowContainer
-      RowContainer<ArrayList<Object>> values = JoinUtil.getRowContainer(hconf,
+      RowContainer<List<Object>> values = JoinUtil.getRowContainer(hconf,
           rowContainerStandardObjectInspectors[pos],
           alias, 1, spillTableDesc, conf, !hasFilter(pos), reporter);
 
@@ -283,7 +283,7 @@ public abstract class CommonJoinOperator
 
       // if serde is null, the input doesn't need to be spilled out
       // e.g., the output columns does not contains the input table
-      RowContainer rc = JoinUtil.getRowContainer(hconf,
+      RowContainer<List<Object>> rc = JoinUtil.getRowContainer(hconf,
           rowContainerStandardObjectInspectors[pos],
           alias, joinCacheSize, spillTableDesc, conf, !hasFilter(pos), reporter);
       storage[pos] = rc;
@@ -328,7 +328,7 @@ public abstract class CommonJoinOperator
   public void startGroup() throws HiveException {
     LOG.trace("Join: Starting new group");
     newGroupStarted = true;
-    for (AbstractRowContainer<ArrayList<Object>> alw : storage) {
+    for (AbstractRowContainer<List<Object>> alw : storage) {
       alw.clear();
     }
     super.startGroup();
@@ -443,7 +443,7 @@ public abstract class CommonJoinOperator
   private void genJoinObject() throws HiveException {
     boolean rightFirst = true;
     boolean hasFilter = hasFilter(order[0]);
-    AbstractRowContainer<ArrayList<Object>> aliasRes = storage[order[0]];
+    AbstractRowContainer<List<Object>> aliasRes = storage[order[0]];
     for (List<Object> rightObj = aliasRes.first(); rightObj != null; rightObj = aliasRes.next()) {
       boolean rightNull = rightObj == dummyObj[0];
       if (hasFilter) {
@@ -471,7 +471,7 @@ public abstract class CommonJoinOperator
       int right = joinCond.getRight();
 
       // search for match in the rhs table
-      AbstractRowContainer<ArrayList<Object>> aliasRes = storage[order[aliasNum]];
+      AbstractRowContainer<List<Object>> aliasRes = storage[order[aliasNum]];
 
       boolean done = false;
       boolean loopAgain = false;
@@ -641,8 +641,8 @@ public abstract class CommonJoinOperator
 
   private void genUniqueJoinObject(int aliasNum, int forwardCachePos)
       throws HiveException {
-    AbstractRowContainer<ArrayList<Object>> alias = storage[order[aliasNum]];
-    for (ArrayList<Object> row = alias.first(); row != null; row = alias.next()) {
+    AbstractRowContainer<List<Object>> alias = storage[order[aliasNum]];
+    for (List<Object> row = alias.first(); row != null; row = alias.next()) {
       int sz = joinValues[order[aliasNum]].size();
       int p = forwardCachePos;
       for (int j = 0; j < sz; j++) {
@@ -662,7 +662,7 @@ public abstract class CommonJoinOperator
     int p = 0;
     for (int i = 0; i < numAliases; i++) {
       int sz = joinValues[order[i]].size();
-      ArrayList<Object> obj = storage[order[i]].first();
+      List<Object> obj = storage[order[i]].first();
       for (int j = 0; j < sz; j++) {
         forwardCache[p++] = obj.get(j);
       }
@@ -684,7 +684,7 @@ public abstract class CommonJoinOperator
       boolean allOne = true;
       for (int i = 0; i < numAliases; i++) {
         Byte alias = order[i];
-        AbstractRowContainer<ArrayList<Object>> alw = storage[alias];
+        AbstractRowContainer<List<Object>> alw = storage[alias];
 
         if (alw.size() != 1) {
           allOne = false;
@@ -717,7 +717,7 @@ public abstract class CommonJoinOperator
       boolean hasEmpty = false;
       for (int i = 0; i < numAliases; i++) {
         Byte alias = order[i];
-        AbstractRowContainer<ArrayList<Object>> alw = storage[alias];
+        AbstractRowContainer<List<Object>> alw = storage[alias];
 
         if (noOuterJoin) {
           if (alw.size() == 0) {
@@ -737,7 +737,7 @@ public abstract class CommonJoinOperator
           } else {
             mayHasMoreThanOne = true;
             if (!hasEmpty) {
-              for (ArrayList<Object> row = alw.first(); row != null; row = alw.next()) {
+              for (List<Object> row = alw.first(); row != null; row = alw.next()) {
                 reportProgress();
                 if (hasAnyFiltered(alias, row)) {
                   hasEmpty = true;
@@ -784,7 +784,7 @@ public abstract class CommonJoinOperator
   @Override
   public void closeOp(boolean abort) throws HiveException {
     LOG.trace("Join Op close");
-    for (AbstractRowContainer<ArrayList<Object>> alw : storage) {
+    for (AbstractRowContainer<List<Object>> alw : storage) {
       if (alw != null) {
         alw.clear(); // clean up the temp files
       }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java Thu Sep 12 01:21:10 2013
@@ -31,12 +31,12 @@ import org.apache.hadoop.hive.serde2.obj
  */
 public class ExprNodeColumnEvaluator extends ExprNodeEvaluator<ExprNodeColumnDesc> {
 
-  transient boolean simpleCase;
-  transient StructObjectInspector inspector;
-  transient StructField field;
-  transient StructObjectInspector[] inspectors;
-  transient StructField[] fields;
-  transient boolean[] unionField;
+  private transient boolean simpleCase;
+  private transient StructObjectInspector inspector;
+  private transient StructField field;
+  private transient StructObjectInspector[] inspectors;
+  private transient StructField[] fields;
+  private transient boolean[] unionField;
 
   public ExprNodeColumnEvaluator(ExprNodeColumnDesc expr) {
     super(expr);
@@ -61,18 +61,17 @@ public class ExprNodeColumnEvaluator ext
     fields = new StructField[names.length];
     unionField = new boolean[names.length];
     int unionIndex = -1;
-
     for (int i = 0; i < names.length; i++) {
       if (i == 0) {
         inspectors[0] = (StructObjectInspector) rowInspector;
       } else {
-        if (unionIndex != -1) {
+        if (unionIndex == -1) {
+          inspectors[i] = (StructObjectInspector) fields[i - 1]
+              .getFieldObjectInspector();
+        } else {
           inspectors[i] = (StructObjectInspector) (
               (UnionObjectInspector)fields[i-1].getFieldObjectInspector()).
               getObjectInspectors().get(unionIndex);
-        } else {
-          inspectors[i] = (StructObjectInspector) fields[i - 1]
-              .getFieldObjectInspector();
         }
       }
       // to support names like _colx:1._coly

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java Thu Sep 12 01:21:10 2013
@@ -50,6 +50,11 @@ public class ExtractOperator extends Ope
     return OperatorType.EXTRACT;
   }
 
+  @Override
+  public boolean acceptLimitPushdown() {
+    return true;
+  }
+
   /**
    * @return the name of the operator
    */

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Thu Sep 12 01:21:10 2013
@@ -75,7 +75,7 @@ public class FetchOperator implements Se
 
   private boolean isNativeTable;
   private FetchWork work;
-  private Operator<?> operator;    // operator tree for processing row further (option)
+  protected Operator<?> operator;    // operator tree for processing row further (option)
   private int splitNum;
   private PartitionDesc currPart;
   private TableDesc currTbl;
@@ -396,7 +396,7 @@ public class FetchOperator implements Se
 
       splitNum = 0;
       serde = partDesc.getDeserializerClass().newInstance();
-      serde.initialize(job, partDesc.getProperties());
+      serde.initialize(job, partDesc.getOverlayedProperties());
 
       if (currTbl != null) {
         tblSerde = serde;
@@ -408,7 +408,7 @@ public class FetchOperator implements Se
 
       ObjectInspector outputOI = ObjectInspectorConverters.getConvertedOI(
           serde.getObjectInspector(),
-          partitionedTableOI == null ? tblSerde.getObjectInspector() : partitionedTableOI);
+          partitionedTableOI == null ? tblSerde.getObjectInspector() : partitionedTableOI, true);
 
       partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
           serde.getObjectInspector(), outputOI);
@@ -416,7 +416,7 @@ public class FetchOperator implements Se
       if (LOG.isDebugEnabled()) {
         LOG.debug("Creating fetchTask with deserializer typeinfo: "
             + serde.getObjectInspector().getTypeName());
-        LOG.debug("deserializer properties: " + partDesc.getProperties());
+        LOG.debug("deserializer properties: " + partDesc.getOverlayedProperties());
       }
 
       if (currPart != null) {
@@ -495,6 +495,8 @@ public class FetchOperator implements Se
     InspectableObject row = getNextRow();
     if (row != null) {
       pushRow(row);
+    } else {
+      operator.flush();
     }
     return row != null;
   }
@@ -629,10 +631,10 @@ public class FetchOperator implements Se
       for (PartitionDesc listPart : listParts) {
         partition = listPart;
         Deserializer partSerde = listPart.getDeserializerClass().newInstance();
-        partSerde.initialize(job, listPart.getProperties());
+        partSerde.initialize(job, listPart.getOverlayedProperties());
 
         partitionedTableOI = ObjectInspectorConverters.getConvertedOI(
-            partSerde.getObjectInspector(), tableOI);
+            partSerde.getObjectInspector(), tableOI, true);
         if (!partitionedTableOI.equals(tableOI)) {
           break;
         }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java Thu Sep 12 01:21:10 2013
@@ -41,6 +41,11 @@ public class ForwardOperator extends Ope
     return OperatorType.FORWARD;
   }
 
+  @Override
+  public boolean acceptLimitPushdown() {
+    return true;
+  }
+
   /**
    * @return the name of the operator
    */

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Thu Sep 12 01:21:10 2013
@@ -23,7 +23,7 @@ import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.EnumMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -149,8 +149,12 @@ import org.apache.hadoop.hive.ql.udf.xml
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveGrouping;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -478,7 +482,7 @@ public final class FunctionRegistry {
       Class<? extends UDF> UDFClass, boolean isOperator, String displayName) {
     if (UDF.class.isAssignableFrom(UDFClass)) {
       FunctionInfo fI = new FunctionInfo(isNative, displayName,
-          new GenericUDFBridge(displayName, isOperator, UDFClass));
+          new GenericUDFBridge(displayName, isOperator, UDFClass.getName()));
       mFunctions.put(functionName.toLowerCase(), fI);
     } else {
       throw new RuntimeException("Registering UDF Class " + UDFClass
@@ -597,24 +601,53 @@ public final class FunctionRegistry {
     return synonyms;
   }
 
-  static Map<TypeInfo, Integer> numericTypes = new HashMap<TypeInfo, Integer>();
-  static List<TypeInfo> numericTypeList = new ArrayList<TypeInfo>();
-
-  static void registerNumericType(String typeName, int level) {
-    TypeInfo t = TypeInfoFactory.getPrimitiveTypeInfo(typeName);
-    numericTypeList.add(t);
-    numericTypes.put(t, level);
+  // The ordering of types here is used to determine which numeric types
+  // are common/convertible to one another. Probably better to rely on the
+  // ordering explicitly defined here than to assume that the enum values
+  // that were arbitrarily assigned in PrimitiveCategory work for our purposes.
+  static EnumMap<PrimitiveCategory, Integer> numericTypes =
+      new EnumMap<PrimitiveCategory, Integer>(PrimitiveCategory.class);
+  static List<PrimitiveCategory> numericTypeList = new ArrayList<PrimitiveCategory>();
+
+  static void registerNumericType(PrimitiveCategory primitiveCategory, int level) {
+    numericTypeList.add(primitiveCategory);
+    numericTypes.put(primitiveCategory, level);
   }
 
   static {
-    registerNumericType(serdeConstants.TINYINT_TYPE_NAME, 1);
-    registerNumericType(serdeConstants.SMALLINT_TYPE_NAME, 2);
-    registerNumericType(serdeConstants.INT_TYPE_NAME, 3);
-    registerNumericType(serdeConstants.BIGINT_TYPE_NAME, 4);
-    registerNumericType(serdeConstants.FLOAT_TYPE_NAME, 5);
-    registerNumericType(serdeConstants.DOUBLE_TYPE_NAME, 6);
-    registerNumericType(serdeConstants.DECIMAL_TYPE_NAME, 7);
-    registerNumericType(serdeConstants.STRING_TYPE_NAME, 8);
+    registerNumericType(PrimitiveCategory.BYTE, 1);
+    registerNumericType(PrimitiveCategory.SHORT, 2);
+    registerNumericType(PrimitiveCategory.INT, 3);
+    registerNumericType(PrimitiveCategory.LONG, 4);
+    registerNumericType(PrimitiveCategory.FLOAT, 5);
+    registerNumericType(PrimitiveCategory.DOUBLE, 6);
+    registerNumericType(PrimitiveCategory.DECIMAL, 7);
+    registerNumericType(PrimitiveCategory.STRING, 8);
+  }
+
+  /**
+   * Given 2 TypeInfo types and the PrimitiveCategory selected as the common class between the two,
+   * return a TypeInfo corresponding to the common PrimitiveCategory, and with type qualifiers
+   * (if applicable) that match the 2 TypeInfo types.
+   * Examples:
+   *   varchar(10), varchar(20), primitive category varchar => varchar(20)
+   *   date, string, primitive category string => string
+   * @param a  TypeInfo of the first type
+   * @param b  TypeInfo of the second type
+   * @param typeCategory PrimitiveCategory of the designated common type between a and b
+   * @return TypeInfo represented by the primitive category, with any applicable type qualifiers.
+   */
+  public static TypeInfo getTypeInfoForPrimitiveCategory(
+      PrimitiveTypeInfo a, PrimitiveTypeInfo b, PrimitiveCategory typeCategory) {
+    // For types with parameters (like varchar), we need to determine the type parameters
+    // that should be added to this type, based on the original 2 TypeInfos.
+    switch (typeCategory) {
+
+      default:
+        // Type doesn't require any qualifiers.
+        return TypeInfoFactory.getPrimitiveTypeInfo(
+          PrimitiveObjectInspectorUtils.getTypeEntryFromPrimitiveCategory(typeCategory).typeName);
+    }
   }
 
   /**
@@ -624,18 +657,38 @@ public final class FunctionRegistry {
     if (a.equals(b)) {
       return a;
     }
+    if (a.getCategory() != Category.PRIMITIVE || b.getCategory() != Category.PRIMITIVE) {
+      return null;
+    }
+    PrimitiveCategory pcA = ((PrimitiveTypeInfo)a).getPrimitiveCategory();
+    PrimitiveCategory pcB = ((PrimitiveTypeInfo)b).getPrimitiveCategory();
+
+    if (pcA == pcB) {
+      // Same primitive category but different qualifiers.
+      return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, pcA);
+    }
+
+    PrimitiveGrouping pgA = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(pcA);
+    PrimitiveGrouping pgB = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(pcB);
+    // handle string types properly
+    if (pgA == PrimitiveGrouping.STRING_GROUP && pgB == PrimitiveGrouping.STRING_GROUP) {
+      return getTypeInfoForPrimitiveCategory(
+          (PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b,PrimitiveCategory.STRING);
+    }
+
     if (FunctionRegistry.implicitConvertable(a, b)) {
-      return b;
+      return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, pcB);
     }
     if (FunctionRegistry.implicitConvertable(b, a)) {
-      return a;
+      return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, pcA);
     }
-    for (TypeInfo t : numericTypeList) {
-      if (FunctionRegistry.implicitConvertable(a, t)
-          && FunctionRegistry.implicitConvertable(b, t)) {
-        return t;
+    for (PrimitiveCategory t : numericTypeList) {
+      if (FunctionRegistry.implicitConvertable(pcA, t)
+          && FunctionRegistry.implicitConvertable(pcB, t)) {
+        return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, t);
       }
     }
+
     return null;
   }
 
@@ -653,12 +706,34 @@ public final class FunctionRegistry {
     if (a.equals(b)) {
       return a;
     }
-    for (TypeInfo t : numericTypeList) {
-      if (FunctionRegistry.implicitConvertable(a, t)
-          && FunctionRegistry.implicitConvertable(b, t)) {
-        return t;
+    if (a.getCategory() != Category.PRIMITIVE || b.getCategory() != Category.PRIMITIVE) {
+      return null;
+    }
+    PrimitiveCategory pcA = ((PrimitiveTypeInfo)a).getPrimitiveCategory();
+    PrimitiveCategory pcB = ((PrimitiveTypeInfo)b).getPrimitiveCategory();
+
+    if (pcA == pcB) {
+      // Same primitive category but different qualifiers.
+      // Rely on getTypeInfoForPrimitiveCategory() to sort out the type params.
+      return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, pcA);
+    }
+
+    PrimitiveGrouping pgA = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(pcA);
+    PrimitiveGrouping pgB = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(pcB);
+    // handle string types properly
+    if (pgA == PrimitiveGrouping.STRING_GROUP && pgB == PrimitiveGrouping.STRING_GROUP) {
+      // Compare as strings. Char comparison semantics may be different if/when implemented.
+      return getTypeInfoForPrimitiveCategory(
+          (PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b,PrimitiveCategory.STRING);
+    }
+
+    for (PrimitiveCategory t : numericTypeList) {
+      if (FunctionRegistry.implicitConvertable(pcA, t)
+          && FunctionRegistry.implicitConvertable(pcB, t)) {
+        return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, t);
       }
     }
+
     return null;
   }
 
@@ -674,45 +749,60 @@ public final class FunctionRegistry {
     if (a.equals(b)) {
       return a;
     }
-    Integer ai = numericTypes.get(a);
-    Integer bi = numericTypes.get(b);
+    if (a.getCategory() != Category.PRIMITIVE || b.getCategory() != Category.PRIMITIVE) {
+      return null;
+    }
+    PrimitiveCategory pcA = ((PrimitiveTypeInfo)a).getPrimitiveCategory();
+    PrimitiveCategory pcB = ((PrimitiveTypeInfo)b).getPrimitiveCategory();
+
+    PrimitiveGrouping pgA = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(pcA);
+    PrimitiveGrouping pgB = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(pcB);
+    // handle string types properly
+    if (pgA == PrimitiveGrouping.STRING_GROUP && pgB == PrimitiveGrouping.STRING_GROUP) {
+      return getTypeInfoForPrimitiveCategory(
+          (PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b,PrimitiveCategory.STRING);
+    }
+
+    Integer ai = numericTypes.get(pcA);
+    Integer bi = numericTypes.get(pcB);
     if (ai == null || bi == null) {
       // If either is not a numeric type, return null.
       return null;
     }
-    return (ai > bi) ? a : b;
+    PrimitiveCategory pcCommon = (ai > bi) ? pcA : pcB;
+    return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, pcCommon);
   }
 
-  /**
-   * Returns whether it is possible to implicitly convert an object of Class
-   * from to Class to.
-   */
-  public static boolean implicitConvertable(TypeInfo from, TypeInfo to) {
-    if (from.equals(to)) {
+  public static boolean implicitConvertable(PrimitiveCategory from, PrimitiveCategory to) {
+    if (from == to) {
       return true;
     }
+
+    PrimitiveGrouping fromPg = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(from);
+    PrimitiveGrouping toPg = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(to);
+
     // Allow implicit String to Double conversion
-    if (from.equals(TypeInfoFactory.stringTypeInfo)
-        && to.equals(TypeInfoFactory.doubleTypeInfo)) {
+    if (fromPg == PrimitiveGrouping.STRING_GROUP && to == PrimitiveCategory.DOUBLE) {
       return true;
     }
     // Allow implicit String to Decimal conversion
-    if (from.equals(TypeInfoFactory.stringTypeInfo)
-        && to.equals(TypeInfoFactory.decimalTypeInfo)) {
+    if (fromPg == PrimitiveGrouping.STRING_GROUP && to == PrimitiveCategory.DECIMAL) {
       return true;
     }
     // Void can be converted to any type
-    if (from.equals(TypeInfoFactory.voidTypeInfo)) {
+    if (from == PrimitiveCategory.VOID) {
       return true;
     }
     // Allow implicit String to Date conversion
-    if (from.equals(TypeInfoFactory.dateTypeInfo)
-        && to.equals(TypeInfoFactory.stringTypeInfo)) {
+    if (fromPg == PrimitiveGrouping.DATE_GROUP && toPg == PrimitiveGrouping.STRING_GROUP) {
       return true;
     }
-
-    if (from.equals(TypeInfoFactory.timestampTypeInfo)
-        && to.equals(TypeInfoFactory.stringTypeInfo)) {
+    // Allow implicit Numeric to String conversion
+    if (fromPg == PrimitiveGrouping.NUMERIC_GROUP && toPg == PrimitiveGrouping.STRING_GROUP) {
+      return true;
+    }
+    // Allow implicit String to varchar conversion, and vice versa
+    if (fromPg == PrimitiveGrouping.STRING_GROUP && toPg == PrimitiveGrouping.STRING_GROUP) {
       return true;
     }
 
@@ -730,6 +820,27 @@ public final class FunctionRegistry {
   }
 
   /**
+   * Returns whether it is possible to implicitly convert an object of Class
+   * from to Class to.
+   */
+  public static boolean implicitConvertable(TypeInfo from, TypeInfo to) {
+    if (from.equals(to)) {
+      return true;
+    }
+
+    // Reimplemented to use PrimitiveCategory rather than TypeInfo, because
+    // 2 TypeInfos from the same qualified type (varchar, decimal) should still be
+    // seen as equivalent.
+    if (from.getCategory() == Category.PRIMITIVE && to.getCategory() == Category.PRIMITIVE) {
+      return implicitConvertable(
+          ((PrimitiveTypeInfo)from).getPrimitiveCategory(),
+          ((PrimitiveTypeInfo)to).getPrimitiveCategory());
+    }
+    return false;
+  }
+
+
+  /**
    * Get the GenericUDAF evaluator for the name and argumentClasses.
    *
    * @param name
@@ -956,6 +1067,59 @@ public final class FunctionRegistry {
   }
 
   /**
+   * Given a set of candidate methods and list of argument types, try to
+   * select the best candidate based on how close the passed argument types are
+   * to the candidate argument types.
+   * For a varchar argument, we would prefer evaluate(string) over evaluate(double).
+   * @param udfMethods  list of candidate methods
+   * @param argumentsPassed list of argument types to match to the candidate methods
+   */
+  static void filterMethodsByTypeAffinity(List<Method> udfMethods, List<TypeInfo> argumentsPassed) {
+    if (udfMethods.size() > 1) {
+      // Prefer methods with a closer signature based on the primitive grouping of each argument.
+      // Score each method based on its similarity to the passed argument types.
+      int currentScore = 0;
+      int bestMatchScore = 0;
+      Method bestMatch = null;
+      for (Method m: udfMethods) {
+        currentScore = 0;
+        List<TypeInfo> argumentsAccepted =
+            TypeInfoUtils.getParameterTypeInfos(m, argumentsPassed.size());
+        Iterator<TypeInfo> argsPassedIter = argumentsPassed.iterator();
+        for (TypeInfo acceptedType : argumentsAccepted) {
+          // Check the affinity of the argument passed in with the accepted argument,
+          // based on the PrimitiveGrouping
+          TypeInfo passedType = argsPassedIter.next();
+          if (acceptedType.getCategory() == Category.PRIMITIVE
+              && passedType.getCategory() == Category.PRIMITIVE) {
+            PrimitiveGrouping acceptedPg = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(
+                ((PrimitiveTypeInfo) acceptedType).getPrimitiveCategory());
+            PrimitiveGrouping passedPg = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(
+                ((PrimitiveTypeInfo) passedType).getPrimitiveCategory());
+            if (acceptedPg == passedPg) {
+              // The passed argument matches somewhat closely with an accepted argument
+              ++currentScore;
+            }
+          }
+        }
+        // Check if the score for this method is any better relative to others
+        if (currentScore > bestMatchScore) {
+          bestMatchScore = currentScore;
+          bestMatch = m;
+        } else if (currentScore == bestMatchScore) {
+          bestMatch = null; // no longer a best match if more than one.
+        }
+      }
+
+      if (bestMatch != null) {
+        // Found a best match during this processing, use it.
+        udfMethods.clear();
+        udfMethods.add(bestMatch);
+      }
+    }
+  }
+
+  /**
    * Gets the closest matching method corresponding to the argument list from a
    * list of methods.
    *
@@ -1025,6 +1189,13 @@ public final class FunctionRegistry {
       // No matching methods found
       throw new NoMatchingMethodException(udfClass, argumentsPassed, mlist);
     }
+
+    if (udfMethods.size() > 1) {
+      // First try selecting methods based on the type affinity of the arguments passed
+      // to the candidate method arguments.
+      filterMethodsByTypeAffinity(udfMethods, argumentsPassed);
+    }
+
     if (udfMethods.size() > 1) {
 
       // if the only difference is numeric types, pick the method
@@ -1050,9 +1221,15 @@ public final class FunctionRegistry {
         for (TypeInfo accepted: argumentsAccepted) {
           TypeInfo reference = referenceIterator.next();
 
-          if (numericTypes.containsKey(accepted)) {
+          boolean acceptedIsPrimitive = false;
+          PrimitiveCategory acceptedPrimCat = PrimitiveCategory.UNKNOWN;
+          if (accepted.getCategory() == Category.PRIMITIVE) {
+            acceptedIsPrimitive = true;
+            acceptedPrimCat = ((PrimitiveTypeInfo) accepted).getPrimitiveCategory();
+          }
+          if (acceptedIsPrimitive && numericTypes.containsKey(acceptedPrimCat)) {
             // We're looking for the udf with the smallest maximum numeric type.
-            int typeValue = numericTypes.get(accepted);
+            int typeValue = numericTypes.get(acceptedPrimCat);
             maxNumericType = typeValue > maxNumericType ? typeValue : maxNumericType;
           } else if (!accepted.equals(reference)) {
             // There are non-numeric arguments that don't match from one UDF to
@@ -1107,7 +1284,7 @@ public final class FunctionRegistry {
     if (genericUDF instanceof GenericUDFBridge) {
       GenericUDFBridge bridge = (GenericUDFBridge) genericUDF;
       return new GenericUDFBridge(bridge.getUdfName(), bridge.isOperator(),
-          bridge.getUdfClass());
+          bridge.getUdfClassName());
     } else if (genericUDF instanceof GenericUDFMacro) {
       GenericUDFMacro bridge = (GenericUDFMacro) genericUDF;
       return new GenericUDFMacro(bridge.getMacroName(), bridge.getBody(),

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Thu Sep 12 01:21:10 2013
@@ -60,7 +60,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
@@ -180,7 +179,7 @@ public class GroupByOperator extends Ope
    * Current number of entries in the hash table.
    */
   protected transient int numEntriesHashTable;
-  transient int countAfterReport;
+  transient int countAfterReport;   // report or forward
   transient int heartbeatInterval;
 
   public static FastBitSet groupingSet2BitSet(int value) {
@@ -244,15 +243,15 @@ public class GroupByOperator extends Ope
     // reduce KEY has union field as the last field if there are distinct
     // aggregates in group-by.
     List<? extends StructField> sfs =
-      ((StandardStructObjectInspector) rowInspector).getAllStructFieldRefs();
+      ((StructObjectInspector) rowInspector).getAllStructFieldRefs();
     if (sfs.size() > 0) {
       StructField keyField = sfs.get(0);
       if (keyField.getFieldName().toUpperCase().equals(
           Utilities.ReduceField.KEY.name())) {
         ObjectInspector keyObjInspector = keyField.getFieldObjectInspector();
-        if (keyObjInspector instanceof StandardStructObjectInspector) {
+        if (keyObjInspector instanceof StructObjectInspector) {
           List<? extends StructField> keysfs =
-            ((StandardStructObjectInspector) keyObjInspector).getAllStructFieldRefs();
+            ((StructObjectInspector) keyObjInspector).getAllStructFieldRefs();
           if (keysfs.size() > 0) {
             // the last field is the union field, if any
             StructField sf = keysfs.get(keysfs.size() - 1);
@@ -1197,4 +1196,15 @@ public class GroupByOperator extends Ope
   public OperatorType getType() {
     return OperatorType.GROUPBY;
   }
+
+  /**
+   * we can push the limit above GBY (running in Reducer), since that will generate single row
+   * for each group. This doesn't necessarily hold for GBY (running in Mappers),
+   * so we don't push limit above it.
+   */
+  @Override
+  public boolean acceptLimitPushdown() {
+    return getConf().getMode() == GroupByDesc.Mode.MERGEPARTIAL ||
+        getConf().getMode() == GroupByDesc.Mode.COMPLETE;
+  }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java Thu Sep 12 01:21:10 2013
@@ -38,7 +38,7 @@ public class HashTableDummyOperator exte
       this.outputObjInspector = serde.getObjectInspector();
       initializeChildren(hconf);
     } catch (Exception e) {
-      LOG.error("Generating output obj inspector from dummy object error");
+      LOG.error("Generating output obj inspector from dummy object error", e);
       e.printStackTrace();
     }
   }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Thu Sep 12 01:21:10 2013
@@ -17,7 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.File;
+import java.io.BufferedOutputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -28,11 +29,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -41,10 +44,8 @@ import org.apache.hadoop.hive.ql.session
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -54,128 +55,57 @@ public class HashTableSinkOperator exten
   private static final long serialVersionUID = 1L;
   private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName());
 
-  protected static MapJoinMetaData metadata = new MapJoinMetaData();
-  // from abstract map join operator
   /**
    * The expressions for join inputs's join keys.
    */
-  protected transient List<ExprNodeEvaluator>[] joinKeys;
+  private transient List<ExprNodeEvaluator>[] joinKeys;
   /**
    * The ObjectInspectors for the join inputs's join keys.
    */
-  protected transient List<ObjectInspector>[] joinKeysObjectInspectors;
-  /**
-   * The standard ObjectInspectors for the join inputs's join keys.
-   */
-  protected transient List<ObjectInspector>[] joinKeysStandardObjectInspectors;
-
-  protected transient int posBigTableAlias = -1; // one of the tables that is not in memory
+  private transient List<ObjectInspector>[] joinKeysObjectInspectors;
 
-  protected transient RowContainer<ArrayList<Object>> emptyList = null;
+  private transient int posBigTableAlias = -1; // one of the tables that is not in memory
 
-  transient int numMapRowsRead;
-  protected transient int totalSz; // total size of the composite object
-  transient boolean firstRow;
   /**
    * The filters for join
    */
-  protected transient List<ExprNodeEvaluator>[] joinFilters;
+  private transient List<ExprNodeEvaluator>[] joinFilters;  
 
-  protected transient int[][] filterMaps;
+  private transient int[][] filterMaps;
 
-  protected transient int numAliases; // number of aliases
   /**
    * The expressions for join outputs.
    */
-  protected transient List<ExprNodeEvaluator>[] joinValues;
+  private transient List<ExprNodeEvaluator>[] joinValues;
   /**
    * The ObjectInspectors for the join inputs.
    */
-  protected transient List<ObjectInspector>[] joinValuesObjectInspectors;
+  private transient List<ObjectInspector>[] joinValuesObjectInspectors;
   /**
    * The ObjectInspectors for join filters.
    */
-  protected transient List<ObjectInspector>[] joinFilterObjectInspectors;
-  /**
-   * The standard ObjectInspectors for the join inputs.
-   */
-  protected transient List<ObjectInspector>[] joinValuesStandardObjectInspectors;
+  private transient List<ObjectInspector>[] joinFilterObjectInspectors;
 
-  protected transient List<ObjectInspector>[] rowContainerStandardObjectInspectors;
-
-  protected transient Byte[] order; // order in which the results should
-  Configuration hconf;
-  protected transient Byte alias;
-  protected transient TableDesc[] spillTableDesc; // spill tables are
-
-  protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
-  protected transient boolean noOuterJoin;
+  private transient Byte[] order; // order in which the results should
+  private Configuration hconf;
+  private transient Byte alias;
+
+  private transient MapJoinTableContainer[] mapJoinTables;
+  private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;  
+
+  private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+  private static final MapJoinRowContainer EMPTY_ROW_CONTAINER = new MapJoinRowContainer();
+  static {
+    EMPTY_ROW_CONTAINER.add(EMPTY_OBJECT_ARRAY);
+  }
+  
+  private transient boolean noOuterJoin;
 
   private long rowNumber = 0;
-  protected transient LogHelper console;
+  private transient LogHelper console;
   private long hashTableScale;
-  private boolean isAbort = false;
-
-  public static class HashTableSinkObjectCtx {
-    ObjectInspector standardOI;
-    SerDe serde;
-    TableDesc tblDesc;
-    Configuration conf;
-    boolean hasFilter;
-
-    /**
-     * @param standardOI
-     * @param serde
-     */
-    public HashTableSinkObjectCtx(ObjectInspector standardOI, SerDe serde, TableDesc tblDesc,
-        boolean hasFilter, Configuration conf) {
-      this.standardOI = standardOI;
-      this.serde = serde;
-      this.tblDesc = tblDesc;
-      this.hasFilter = hasFilter;
-      this.conf = conf;
-    }
-
-    /**
-     * @return the standardOI
-     */
-    public ObjectInspector getStandardOI() {
-      return standardOI;
-    }
-
-    /**
-     * @return the serde
-     */
-    public SerDe getSerDe() {
-      return serde;
-    }
-
-    public TableDesc getTblDesc() {
-      return tblDesc;
-    }
-
-    public boolean hasFilterTag() {
-      return hasFilter;
-    }
-
-    public Configuration getConf() {
-      return conf;
-    }
-
-  }
-
-  public static MapJoinMetaData getMetadata() {
-    return metadata;
-  }
-
-  private static final transient String[] FATAL_ERR_MSG = {
-      null, // counter value 0 means no error
-      "Mapside join exceeds available memory. "
-          + "Please try removing the mapjoin hint."};
-  private final int metadataKeyTag = -1;
-  transient int[] metadataValueTag;
-
-
+  private MapJoinMemoryExhaustionHandler memoryExhaustionHandler;
+  
   public HashTableSinkOperator() {
   }
 
@@ -189,8 +119,7 @@ public class HashTableSinkOperator exten
   protected void initializeOp(Configuration hconf) throws HiveException {
     boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT);
     console = new LogHelper(LOG, isSilent);
-    numMapRowsRead = 0;
-    firstRow = true;
+    memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage());
 
     // for small tables only; so get the big table position first
     posBigTableAlias = conf.getPosBigTable();
@@ -198,9 +127,7 @@ public class HashTableSinkOperator exten
     order = conf.getTagOrder();
 
     // initialize some variables, which used to be initialized in CommonJoinOperator
-    numAliases = conf.getExprs().size();
     this.hconf = hconf;
-    totalSz = 0;
 
     noOuterJoin = conf.isNoOuterJoin();
     filterMaps = conf.getFilterMap();
@@ -212,16 +139,12 @@ public class HashTableSinkOperator exten
     JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), posBigTableAlias);
     joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
         inputObjInspectors, posBigTableAlias, tagLen);
-    joinKeysStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
-        joinKeysObjectInspectors, posBigTableAlias, tagLen);
 
     // process join values
     joinValues = new List[tagLen];
     JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), posBigTableAlias);
     joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues,
         inputObjInspectors, posBigTableAlias, tagLen);
-    joinValuesStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
-        joinValuesObjectInspectors, posBigTableAlias, tagLen);
 
     // process join filters
     joinFilters = new List[tagLen];
@@ -229,9 +152,7 @@ public class HashTableSinkOperator exten
     joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters,
         inputObjInspectors, posBigTableAlias, tagLen);
 
-    if (noOuterJoin) {
-      rowContainerStandardObjectInspectors = joinValuesStandardObjectInspectors;
-    } else {
+    if (!noOuterJoin) {
       List<ObjectInspector>[] rowContainerObjectInspectors = new List[tagLen];
       for (Byte alias : order) {
         if (alias == posBigTableAlias) {
@@ -245,43 +166,43 @@ public class HashTableSinkOperator exten
         }
         rowContainerObjectInspectors[alias] = rcOIs;
       }
-      rowContainerStandardObjectInspectors = getStandardObjectInspectors(
-          rowContainerObjectInspectors, tagLen);
-    }
-
-    metadataValueTag = new int[numAliases];
-    for (int pos = 0; pos < numAliases; pos++) {
-      metadataValueTag[pos] = -1;
     }
-    mapJoinTables = new HashMapWrapper[tagLen];
-
+    mapJoinTables = new MapJoinTableContainer[tagLen];
+    mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen];
     int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
     float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
         HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
-    float hashTableMaxMemoryUsage = this.getConf().getHashtableMemoryUsage();
-
     hashTableScale = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEHASHTABLESCALE);
     if (hashTableScale <= 0) {
       hashTableScale = 1;
     }
-
-    // initialize the hash tables for other tables
-    for (Byte pos : order) {
-      if (pos == posBigTableAlias) {
-        continue;
+    try {
+      TableDesc keyTableDesc = conf.getKeyTblDesc();
+      SerDe keySerde = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
+          null);
+      keySerde.initialize(null, keyTableDesc.getProperties());
+      MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerde, false);
+      for (Byte pos : order) {
+        if (pos == posBigTableAlias) {
+          continue;
+        }
+        mapJoinTables[pos] = new HashMapWrapper(hashTableThreshold, hashTableLoadFactor);        
+        TableDesc valueTableDesc = conf.getValueTblFilteredDescs().get(pos);
+        SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
+        valueSerDe.initialize(null, valueTableDesc.getProperties());
+        mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, new MapJoinObjectSerDeContext(
+            valueSerDe, hasFilter(pos)));
       }
-
-      HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>(
-          hashTableThreshold, hashTableLoadFactor, hashTableMaxMemoryUsage);
-
-      mapJoinTables[pos] = hashTable;
+    } catch (SerDeException e) {
+      throw new HiveException(e);
     }
   }
 
 
 
-  protected static List<ObjectInspector>[] getStandardObjectInspectors(
+  private static List<ObjectInspector>[] getStandardObjectInspectors(
       List<ObjectInspector>[] aliasToObjectInspectors, int maxTag) {
+    @SuppressWarnings("unchecked")
     List<ObjectInspector>[] result = new List[maxTag];
     for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) {
       List<ObjectInspector> oiList = aliasToObjectInspectors[alias];
@@ -299,104 +220,45 @@ public class HashTableSinkOperator exten
 
   }
 
-  private void setKeyMetaData() throws SerDeException {
-    TableDesc keyTableDesc = conf.getKeyTblDesc();
-    SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
-        null);
-    keySerializer.initialize(null, keyTableDesc.getProperties());
-
-    metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
-        ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
-            ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
-  }
-
-  private boolean hasFilter(int alias) {
-    return filterMaps != null && filterMaps[alias] != null;
-  }
   /*
    * This operator only process small tables Read the key/value pairs Load them into hashtable
    */
   @Override
   public void processOp(Object row, int tag) throws HiveException {
-    // let the mapJoinOp process these small tables
-    try {
-      if (firstRow) {
-        // generate the map metadata
-        setKeyMetaData();
-        firstRow = false;
-      }
-      alias = (byte)tag;
-
-      // compute keys and values as StandardObjects
-      AbstractMapJoinKey keyMap = JoinUtil.computeMapJoinKeys(row, joinKeys[alias],
-          joinKeysObjectInspectors[alias]);
-
-      Object[] value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
-          joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias],
-          filterMaps == null ? null : filterMaps[alias]);
-
-      HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables[alias];
-
-      MapJoinObjectValue o = hashTable.get(keyMap);
-      MapJoinRowContainer<Object[]> res = null;
-
-      boolean needNewKey = true;
-      if (o == null) {
-        res = new MapJoinRowContainer<Object[]>();
-        res.add(value);
-
-        if (metadataValueTag[tag] == -1) {
-          metadataValueTag[tag] = order[tag];
-          setValueMetaData(tag);
-        }
-
-        // Construct externalizable objects for key and value
-        if (needNewKey) {
-          MapJoinObjectValue valueObj = new MapJoinObjectValue(
-              metadataValueTag[tag], res);
-
-          rowNumber++;
-          if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
-            isAbort = hashTable.isAbort(rowNumber, console);
-            if (isAbort) {
-              throw new HiveException("RunOutOfMeomoryUsage");
-            }
-          }
-          hashTable.put(keyMap, valueObj);
-        }
-
+    alias = (byte)tag;
+    // compute keys and values as StandardObjects
+    MapJoinKey key = JoinUtil.computeMapJoinKeys(null, row, joinKeys[alias],
+        joinKeysObjectInspectors[alias]);
+    Object[] value = EMPTY_OBJECT_ARRAY;
+    if((hasFilter(alias) && filterMaps[alias].length > 0) || joinValues[alias].size() > 0) {
+      value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
+        joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias],
+        filterMaps == null ? null : filterMaps[alias]);
+    }
+    MapJoinTableContainer tableContainer = mapJoinTables[alias];
+    MapJoinRowContainer rowContainer = tableContainer.get(key);
+    if (rowContainer == null) {
+      if(value.length != 0) {
+        rowContainer = new MapJoinRowContainer();
+        rowContainer.add(value);
       } else {
-        res = o.getObj();
-        res.add(value);
+        rowContainer = EMPTY_ROW_CONTAINER;
       }
-
-
-    } catch (SerDeException e) {
-      throw new HiveException(e);
+      rowNumber++;
+      if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
+        memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber);
+      }
+      tableContainer.put(key, rowContainer);
+    } else if (rowContainer == EMPTY_ROW_CONTAINER) {
+      rowContainer = rowContainer.copy();
+      rowContainer.add(value);
+      tableContainer.put(key, rowContainer);
+    } else {
+      rowContainer.add(value);
     }
-
   }
-
-  private void setValueMetaData(int tag) throws SerDeException {
-    TableDesc valueTableDesc = conf.getValueTblFilteredDescs().get(tag);
-    SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
-        null);
-
-    valueSerDe.initialize(null, valueTableDesc.getProperties());
-
-    List<ObjectInspector> newFields = rowContainerStandardObjectInspectors[alias];
-    int length = newFields.size();
-    List<String> newNames = new ArrayList<String>(length);
-    for (int i = 0; i < length; i++) {
-      String tmp = new String("tmp_" + i);
-      newNames.add(tmp);
-    }
-    StandardStructObjectInspector standardOI = ObjectInspectorFactory
-        .getStandardStructObjectInspector(newNames, newFields);
-
-    int alias = Integer.valueOf(metadataValueTag[tag]);
-    metadata.put(Integer.valueOf(metadataValueTag[tag]), new HashTableSinkObjectCtx(
-        standardOI, valueSerDe, valueTableDesc, hasFilter(alias), hconf));
+  private boolean hasFilter(int alias) {
+    return filterMaps != null && filterMaps[alias] != null;
   }
 
   @Override
@@ -405,42 +267,36 @@ public class HashTableSinkOperator exten
       if (mapJoinTables != null) {
         // get tmp file URI
         String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
-        LOG.info("Get TMP URI: " + tmpURI);
-        long fileLength;
+        LOG.info("Temp URI for side table: " + tmpURI);
         for (byte tag = 0; tag < mapJoinTables.length; tag++) {
           // get the key and value
-          HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables[tag];
-          if (hashTable == null) {
+          MapJoinTableContainer tableContainer = mapJoinTables[tag];
+          if (tableContainer == null) {
             continue;
           }
-
           // get current input file name
           String bigBucketFileName = getExecContext().getCurrentBigBucketFile();
-
           String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName);
-
           // get the tmp URI path; it will be a hdfs path if not local mode
           String dumpFilePrefix = conf.getDumpFilePrefix();
           String tmpURIPath = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);
-          hashTable.isAbort(rowNumber, console);
-          console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath);
+          console.printInfo(Utilities.now() + "\tDump the side-table into file: " + tmpURIPath);
           // get the hashtable file and path
           Path path = new Path(tmpURIPath);
           FileSystem fs = path.getFileSystem(hconf);
-          File file = new File(path.toUri().getPath());
-          fs.create(path);
-          fileLength = hashTable.flushMemoryCacheToPersistent(file);
-          console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: "
-              + fileLength);
-
-          hashTable.close();
+          ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path), 4096));
+          try {
+            mapJoinTableSerdes[tag].persist(out, tableContainer);
+          } finally {
+            out.close();
+          }
+          tableContainer.clear();
+          console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath);
         }
       }
-
       super.closeOp(abort);
     } catch (Exception e) {
-      LOG.error("Generate Hashtable error", e);
-      e.printStackTrace();
+      LOG.error("Error generating side-table", e);
     }
   }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Thu Sep 12 01:21:10 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.LongWritable;
@@ -114,8 +113,8 @@ public class JoinOperator extends Common
 
       // Add the value to the vector
       // if join-key is null, process each row in different group.
-      StandardStructObjectInspector inspector =
-          (StandardStructObjectInspector) sf.getFieldObjectInspector();
+      StructObjectInspector inspector =
+          (StructObjectInspector) sf.getFieldObjectInspector();
       if (SerDeUtils.hasAnyNullObject(keyObject, inspector, nullsafes)) {
         endGroup();
         startGroup();

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java Thu Sep 12 01:21:10 2013
@@ -23,10 +23,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinSingleKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -147,42 +144,22 @@ public class JoinUtil {
 
   /**
    * Return the key as a standard object. StandardObject can be inspected by a
-   * standard ObjectInspector.
+   * standard ObjectInspector. The first parameter a MapJoinKey can
+   * be null if the caller would like a new object to be instantiated.
    */
-  public static AbstractMapJoinKey computeMapJoinKeys(Object row,
+  public static MapJoinKey computeMapJoinKeys(MapJoinKey key, Object row,
       List<ExprNodeEvaluator> keyFields, List<ObjectInspector> keyFieldsOI)
       throws HiveException {
-
     int size = keyFields.size();
-    if(size == 1){
-      Object obj = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(0)
-          .evaluate(row), keyFieldsOI.get(0),
-          ObjectInspectorCopyOption.WRITABLE));
-      MapJoinSingleKey key = new MapJoinSingleKey(obj);
-      return key;
-    }else if(size == 2){
-      Object obj1 = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(0)
-          .evaluate(row), keyFieldsOI.get(0),
-          ObjectInspectorCopyOption.WRITABLE));
-
-      Object obj2 = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(1)
-          .evaluate(row), keyFieldsOI.get(1),
-          ObjectInspectorCopyOption.WRITABLE));
-
-      MapJoinDoubleKeys key = new MapJoinDoubleKeys(obj1,obj2);
-      return key;
-    }else{
-      // Compute the keys
-      Object[] nr = new Object[keyFields.size()];
-      for (int i = 0; i < keyFields.size(); i++) {
-
-        nr[i] = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(i)
-            .evaluate(row), keyFieldsOI.get(i),
-            ObjectInspectorCopyOption.WRITABLE));
-      }
-      MapJoinObjectKey key = new MapJoinObjectKey(nr);
-      return key;
-      }
+    if(key == null || key.getKey().length != size) {
+      key = new MapJoinKey(new Object[size]);
+    }
+    Object[] array = key.getKey();
+    for (int keyIndex = 0; keyIndex < size; keyIndex++) {
+      array[keyIndex] = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(keyIndex)
+          .evaluate(row), keyFieldsOI.get(keyIndex), ObjectInspectorCopyOption.WRITABLE));
+    }
+    return key;
   }
 
 
@@ -354,7 +331,7 @@ public class JoinUtil {
   }
 
 
-  public static RowContainer getRowContainer(Configuration hconf,
+  public static RowContainer<List<Object>> getRowContainer(Configuration hconf,
       List<ObjectInspector> structFieldObjectInspectors,
       Byte alias,int containerSize, TableDesc[] spillTableDesc,
       JoinDesc conf,boolean noFilter, Reporter reporter) throws HiveException {
@@ -366,7 +343,7 @@ public class JoinUtil {
       containerSize = -1;
     }
 
-    RowContainer rc = new RowContainer(containerSize, hconf, reporter);
+    RowContainer<List<Object>> rc = new RowContainer<List<Object>>(containerSize, hconf, reporter);
     StructObjectInspector rcOI = null;
     if (tblDesc != null) {
       // arbitrary column names used internally for serializing to spill table

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Thu Sep 12 01:21:10 2013
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 
@@ -27,20 +30,17 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -51,23 +51,15 @@ public class MapJoinOperator extends Abs
   private static final long serialVersionUID = 1L;
   private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
 
-
-  protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
-
-  protected static MapJoinMetaData metadata = new MapJoinMetaData();
-  public static MapJoinMetaData getMetadata() {
-    return metadata;
-  }
-
   private static final transient String[] FATAL_ERR_MSG = {
       null, // counter value 0 means no error
       "Mapside join exceeds available memory. "
           + "Please try removing the mapjoin hint."};
 
-  protected transient MapJoinRowContainer<ArrayList<Object>>[] rowContainerMap;
-  transient int metadataKeyTag;
-  transient int[] metadataValueTag;
-  transient boolean hashTblInitedOnce;
+  private transient MapJoinTableContainer[] mapJoinTables;
+  private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
+  private transient boolean hashTblInitedOnce;
+  private transient MapJoinKey key;
 
   public MapJoinOperator() {
   }
@@ -77,35 +69,11 @@ public class MapJoinOperator extends Abs
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   protected void initializeOp(Configuration hconf) throws HiveException {
-
     super.initializeOp(hconf);
-
-    metadataValueTag = new int[numAliases];
-    for (int pos = 0; pos < numAliases; pos++) {
-      metadataValueTag[pos] = -1;
-    }
-
-    metadataKeyTag = -1;
-
     int tagLen = conf.getTagLength();
-
-    mapJoinTables = new HashMapWrapper[tagLen];
-    rowContainerMap = new MapJoinRowContainer[tagLen];
-    // initialize the hash tables for other tables
-    for (int pos = 0; pos < numAliases; pos++) {
-      if (pos == posBigTable) {
-        continue;
-      }
-
-      HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
-
-      mapJoinTables[pos] = hashTable;
-      MapJoinRowContainer<ArrayList<Object>> rowContainer = new MapJoinRowContainer<ArrayList<Object>>();
-      rowContainerMap[pos] = rowContainer;
-    }
-
+    mapJoinTables = new MapJoinTableContainer[tagLen];
+    mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen];
     hashTblInitedOnce = false;
   }
 
@@ -118,14 +86,12 @@ public class MapJoinOperator extends Abs
   public void generateMapMetaData() throws HiveException, SerDeException {
     // generate the meta data for key
     // index for key is -1
+    
     TableDesc keyTableDesc = conf.getKeyTblDesc();
     SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
         null);
     keySerializer.initialize(null, keyTableDesc.getProperties());
-    metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
-        ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
-            ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
-
+    MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerializer, false);
     for (int pos = 0; pos < order.length; pos++) {
       if (pos == posBigTable) {
         continue;
@@ -139,16 +105,12 @@ public class MapJoinOperator extends Abs
       SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
           null);
       valueSerDe.initialize(null, valueTableDesc.getProperties());
-
-      ObjectInspector inspector = valueSerDe.getObjectInspector();
-      metadata.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils
-          .getStandardObjectInspector(inspector, ObjectInspectorCopyOption.WRITABLE),
-          valueSerDe, valueTableDesc, hasFilter(pos), hconf));
+      MapJoinObjectSerDeContext valueContext = new MapJoinObjectSerDeContext(valueSerDe, hasFilter(pos));
+      mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, valueContext);
     }
   }
 
   private void loadHashTable() throws HiveException {
-
     if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
       if (hashTblInitedOnce) {
         return;
@@ -158,12 +120,9 @@ public class MapJoinOperator extends Abs
     }
 
     String baseDir = null;
-
     String currentInputFile = getExecContext().getCurrentInputFile();
     LOG.info("******* Load from HashTable File: input : " + currentInputFile);
-
     String fileName = getExecContext().getLocalWork().getBucketFileName(currentInputFile);
-
     try {
       if (ShimLoader.getHadoopShims().isLocalMode(hconf)) {
         baseDir = this.getExecContext().getLocalWork().getTmpFileURI();
@@ -183,18 +142,25 @@ public class MapJoinOperator extends Abs
           baseDir = archiveLocalLink.toUri().getPath();
         }
       }
-      for (byte pos = 0; pos < mapJoinTables.length; pos++) {
-        HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashtable = mapJoinTables[pos];
-        if (hashtable == null) {
+      for (int pos = 0; pos < mapJoinTables.length; pos++) {
+        if (pos == posBigTable) {
           continue;
         }
-        String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), pos, fileName);
+        if(baseDir == null) {
+          throw new IllegalStateException("baseDir cannot be null");
+        }
+        String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), (byte)pos, fileName);
         Path path = new Path(filePath);
-        LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString());
-        hashtable.initilizePersistentHash(path.toUri().getPath());
+        LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path);
+        ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(
+            new FileInputStream(path.toUri().getPath()), 4096));
+        try{ 
+          mapJoinTables[pos] = mapJoinTableSerdes[pos].load(in);
+        } finally {
+          in.close();
+        }
       }
     } catch (Exception e) {
-      LOG.error("Load Distributed Cache Error", e);
       throw new HiveException(e);
     }
   }
@@ -208,39 +174,31 @@ public class MapJoinOperator extends Abs
         generateMapMetaData();
         firstRow = false;
       }
-
       loadHashTable();
     } catch (SerDeException e) {
-      e.printStackTrace();
       throw new HiveException(e);
     }
   }
 
   @Override
   public void processOp(Object row, int tag) throws HiveException {
-
     try {
       if (firstRow) {
         // generate the map metadata
         generateMapMetaData();
         firstRow = false;
       }
-
       alias = (byte)tag;
 
       // compute keys and values as StandardObjects
-      AbstractMapJoinKey key = JoinUtil.computeMapJoinKeys(row, joinKeys[alias],
+      key = JoinUtil.computeMapJoinKeys(key, row, joinKeys[alias],
           joinKeysObjectInspectors[alias]);
-
       boolean joinNeeded = false;
       for (byte pos = 0; pos < order.length; pos++) {
         if (pos != alias) {
-
-          MapJoinObjectValue o = mapJoinTables[pos].get(key);
-          MapJoinRowContainer<ArrayList<Object>> rowContainer = rowContainerMap[pos];
-
+          MapJoinRowContainer rowContainer = mapJoinTables[pos].get(key);
           // there is no join-value or join-key has all null elements
-          if (o == null || key.hasAnyNulls(nullsafes)) {
+          if (rowContainer == null || key.hasAnyNulls(nullsafes)) {
             if (!noOuterJoin) {
               joinNeeded = true;
               storage[pos] = dummyObjVectors[pos];
@@ -249,45 +207,36 @@ public class MapJoinOperator extends Abs
             }
           } else {
             joinNeeded = true;
-            rowContainer.reset(o.getObj());
-            storage[pos] = rowContainer;
-            aliasFilterTags[pos] = o.getAliasFilter();
+            storage[pos] = rowContainer.copy();
+            aliasFilterTags[pos] = rowContainer.getAliasFilter();
           }
         }
       }
-
       if (joinNeeded) {
         ArrayList<Object> value = getFilteredValue(alias, row);
-
         // Add the value to the ArrayList
         storage[alias].add(value);
-
         // generate the output records
         checkAndGenObject();
       }
-
       // done with the row
       storage[tag].clear();
-
       for (byte pos = 0; pos < order.length; pos++) {
         if (pos != tag) {
           storage[pos] = null;
         }
       }
-
     } catch (SerDeException e) {
-      e.printStackTrace();
       throw new HiveException(e);
     }
   }
 
   @Override
   public void closeOp(boolean abort) throws HiveException {
-
     if (mapJoinTables != null) {
-      for (HashMapWrapper<?, ?> hashTable : mapJoinTables) {
-        if (hashTable != null) {
-          hashTable.close();
+      for (MapJoinTableContainer tableContainer : mapJoinTables) {
+        if (tableContainer != null) {
+          tableContainer.clear();
         }
       }
     }