You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/10/14 21:07:05 UTC

svn commit: r1631841 [17/42] - in /hive/branches/llap: ./ accumulo-handler/ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/ accumulo-handler/src/java/org/apache/hadoop/hiv...

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java Tue Oct 14 19:06:45 2014
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -90,42 +91,11 @@ public class SQLStdHiveAccessController 
       HiveAuthenticationProvider authenticator, HiveAuthzSessionContext ctx) throws HiveAuthzPluginException {
     this.metastoreClientFactory = metastoreClientFactory;
     this.authenticator = authenticator;
-    this.sessionCtx = applyTestSettings(ctx, conf);
-
-    assertHiveCliAuthDisabled(conf);
-    initUserRoles();
+    this.sessionCtx = SQLAuthorizationUtils.applyTestSettings(ctx, conf);
     LOG.info("Created SQLStdHiveAccessController for session context : " + sessionCtx);
   }
 
   /**
-   * Change the session context based on configuration to aid in testing of sql std auth
-   * @param ctx
-   * @param conf
-   * @return
-   */
-  private HiveAuthzSessionContext applyTestSettings(HiveAuthzSessionContext ctx, HiveConf conf) {
-    if(conf.getBoolVar(ConfVars.HIVE_TEST_AUTHORIZATION_SQLSTD_HS2_MODE) &&
-        ctx.getClientType() == CLIENT_TYPE.HIVECLI
-        ){
-      // create new session ctx object with HS2 as client type
-      HiveAuthzSessionContext.Builder ctxBuilder = new HiveAuthzSessionContext.Builder(ctx);
-      ctxBuilder.setClientType(CLIENT_TYPE.HIVESERVER2);
-      return ctxBuilder.build();
-    }
-    return ctx;
-  }
-
-  private void assertHiveCliAuthDisabled(HiveConf conf) throws HiveAuthzPluginException {
-    if (sessionCtx.getClientType() == CLIENT_TYPE.HIVECLI
-        && conf.getBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
-      throw new HiveAuthzPluginException(
-          "SQL standards based authorization should not be enabled from hive cli"
-              + "Instead the use of storage based authorization in hive metastore is reccomended. Set "
-              + ConfVars.HIVE_AUTHORIZATION_ENABLED.varname + "=false to disable authz within cli");
-    }
-  }
-
-  /**
    * (Re-)initialize currentRoleNames if necessary.
    * @throws HiveAuthzPluginException
    */
@@ -381,9 +351,9 @@ public class SQLStdHiveAccessController 
   @Override
   public List<HiveRoleGrant> getPrincipalGrantInfoForRole(String roleName) throws HiveAuthzPluginException, HiveAccessControlException {
     // only user belonging to admin role can list role
-    if (!isUserAdmin()) {
+    if (!isUserAdmin() &&  !doesUserHasAdminOption(Arrays.asList(roleName))) {
       throw new HiveAccessControlException("Current user : " + currentUserName+ " is not"
-        + " allowed get principals in a role. " + ADMIN_ONLY_MSG);
+        + " allowed get principals in a role. " + ADMIN_ONLY_MSG + " Otherwise, " + HAS_ADMIN_PRIV_MSG);
     }
     try {
       return getHiveRoleGrants(metastoreClientFactory.getHiveMetastoreClient(), roleName);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidator.java Tue Oct 14 19:06:45 2014
@@ -25,12 +25,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizationValidator;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext.CLIENT_TYPE;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactory;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrincipal;
@@ -44,16 +47,30 @@ public class SQLStdHiveAuthorizationVali
   private final HiveConf conf;
   private final HiveAuthenticationProvider authenticator;
   private final SQLStdHiveAccessControllerWrapper privController;
+  private final HiveAuthzSessionContext ctx;
   public static final Log LOG = LogFactory.getLog(SQLStdHiveAuthorizationValidator.class);
 
   public SQLStdHiveAuthorizationValidator(HiveMetastoreClientFactory metastoreClientFactory,
       HiveConf conf, HiveAuthenticationProvider authenticator,
-      SQLStdHiveAccessControllerWrapper privilegeManager) {
+      SQLStdHiveAccessControllerWrapper privilegeManager, HiveAuthzSessionContext ctx)
+      throws HiveAuthzPluginException {
 
     this.metastoreClientFactory = metastoreClientFactory;
     this.conf = conf;
     this.authenticator = authenticator;
     this.privController = privilegeManager;
+    this.ctx = SQLAuthorizationUtils.applyTestSettings(ctx, conf);
+    assertHiveCliAuthDisabled(conf);
+  }
+
+  private void assertHiveCliAuthDisabled(HiveConf conf) throws HiveAuthzPluginException {
+    if (ctx.getClientType() == CLIENT_TYPE.HIVECLI
+        && conf.getBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
+      throw new HiveAuthzPluginException(
+          "SQL standards based authorization should not be enabled from hive cli"
+              + "Instead the use of storage based authorization in hive metastore is reccomended. Set "
+              + ConfVars.HIVE_AUTHORIZATION_ENABLED.varname + "=false to disable authz within cli");
+    }
   }
 
   @Override

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizerFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizerFactory.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizerFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizerFactory.java Tue Oct 14 19:06:45 2014
@@ -37,7 +37,7 @@ public class SQLStdHiveAuthorizerFactory
     return new HiveAuthorizerImpl(
         privilegeManager,
         new SQLStdHiveAuthorizationValidator(metastoreClientFactory, conf, authenticator,
-            privilegeManager)
+            privilegeManager, ctx)
         );
   }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Tue Oct 14 19:06:45 2014
@@ -515,16 +515,17 @@ public class SessionState {
    */
   private Path createRootHDFSDir(HiveConf conf) throws IOException {
     Path rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR));
-    FsPermission expectedHDFSDirPermission = new FsPermission("777");
+    FsPermission writableHDFSDirPermission = new FsPermission((short)00733);
     FileSystem fs = rootHDFSDirPath.getFileSystem(conf);
     if (!fs.exists(rootHDFSDirPath)) {
-      Utilities.createDirsWithPermission(conf, rootHDFSDirPath, expectedHDFSDirPermission, true);
+      Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true);
     }
     FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission();
     LOG.debug("HDFS root scratch dir: " + rootHDFSDirPath + ", permission: "
         + currentHDFSDirPermission);
-    // If the root HDFS scratch dir already exists, make sure the permissions are 777.
-    if (!expectedHDFSDirPermission.equals(fs.getFileStatus(rootHDFSDirPath).getPermission())) {
+    // If the root HDFS scratch dir already exists, make sure it is writeable.
+    if (!((currentHDFSDirPermission.toShort() & writableHDFSDirPermission
+        .toShort()) == writableHDFSDirPermission.toShort())) {
       throw new RuntimeException("The root scratch dir: " + rootHDFSDirPath
           + " on HDFS should be writable. Current permissions are: " + currentHDFSDirPermission);
     }
@@ -1244,7 +1245,7 @@ public class SessionState {
 
     try {
       if (tezSessionState != null) {
-        TezSessionPoolManager.getInstance().close(tezSessionState);
+        TezSessionPoolManager.getInstance().close(tezSessionState, false);
       }
     } catch (Exception e) {
       LOG.info("Error closing tez session", e);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java Tue Oct 14 19:06:45 2014
@@ -18,10 +18,18 @@
 
 package org.apache.hadoop.hive.ql.stats;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -79,20 +87,16 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 
 public class StatsUtils {
 
   private static final Log LOG = LogFactory.getLog(StatsUtils.class.getName());
 
+
   /**
    * Collect table, partition and column level statistics
    * @param conf
@@ -109,15 +113,34 @@ public class StatsUtils {
   public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList,
       Table table, TableScanOperator tableScanOperator) throws HiveException {
 
-    Statistics stats = new Statistics();
-
     // column level statistics are required only for the columns that are needed
     List<ColumnInfo> schema = tableScanOperator.getSchema().getSignature();
     List<String> neededColumns = tableScanOperator.getNeededColumns();
+    List<String> referencedColumns = tableScanOperator.getReferencedColumns();
+
+    return collectStatistics(conf, partList, table, schema, neededColumns, referencedColumns);
+  }
+
+  private static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList,
+      Table table, List<ColumnInfo> schema, List<String> neededColumns,
+      List<String> referencedColumns) throws HiveException {
+
     boolean fetchColStats =
         HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS);
     boolean fetchPartStats =
         HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_FETCH_PARTITION_STATS);
+
+    return collectStatistics(conf, partList, table, schema, neededColumns, referencedColumns,
+        fetchColStats, fetchPartStats);
+  }
+
+  public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList,
+      Table table, List<ColumnInfo> schema, List<String> neededColumns,
+      List<String> referencedColumns, boolean fetchColStats, boolean fetchPartStats)
+      throws HiveException {
+
+    Statistics stats = new Statistics();
+
     float deserFactor =
         HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_STATS_DESERIALIZATION_FACTOR);
 
@@ -207,7 +230,6 @@ public class StatsUtils {
           stats.getBasicStatsState().equals(State.COMPLETE)) {
         stats.setBasicStatsState(State.PARTIAL);
       }
-      boolean haveFullStats = fetchColStats;
       if (fetchColStats) {
         List<String> partNames = new ArrayList<String>(partList.getNotDeniedPartns().size());
         for (Partition part : partList.getNotDeniedPartns()) {
@@ -215,37 +237,84 @@ public class StatsUtils {
         }
         Map<String, String> colToTabAlias = new HashMap<String, String>();
         neededColumns = processNeededColumns(schema, neededColumns, colToTabAlias);
-        AggrStats aggrStats = Hive.get().getAggrColStatsFor(table.getDbName(), table.getTableName(), neededColumns, partNames);
+        AggrStats aggrStats = Hive.get().getAggrColStatsFor(table.getDbName(), table.getTableName(),
+            neededColumns, partNames);
         if (null == aggrStats) {
-          haveFullStats = false;
+          // There are some partitions with no state (or we didn't fetch any state).
+          // Update the stats with empty list to reflect that in the
+          // state/initialize structures.
+          List<ColStatistics> emptyStats = Lists.newArrayList();
+
+          // add partition column stats
+          addParitionColumnStats(neededColumns, referencedColumns, schema, table, partList,
+              emptyStats);
+
+          stats.addToColumnStats(emptyStats);
+          stats.updateColumnStatsState(deriveStatType(emptyStats, referencedColumns));
         } else {
           List<ColumnStatisticsObj> colStats = aggrStats.getColStats();
           if (colStats.size() != neededColumns.size()) {
-            LOG.debug("Column stats requested for : " + neededColumns.size() + " columns. Able to retrieve"
-                + " for " + colStats.size() + " columns");
+            LOG.debug("Column stats requested for : " + neededColumns.size() + " columns. Able to" +
+                " retrieve for " + colStats.size() + " columns");
           }
-          List<ColStatistics> columnStats = convertColStats(colStats, table.getTableName(), colToTabAlias);
+          List<ColStatistics> columnStats = convertColStats(colStats, table.getTableName(),
+              colToTabAlias);
+
+          addParitionColumnStats(neededColumns, referencedColumns, schema, table, partList,
+              columnStats);
+
           stats.addToColumnStats(columnStats);
-          State colState = deriveStatType(columnStats, neededColumns);
+          State colState = deriveStatType(columnStats, referencedColumns);
           if (aggrStats.getPartsFound() != partNames.size() && colState != State.NONE) {
-            LOG.debug("Column stats requested for : " + partNames.size() +" partitions. "
-              + "Able to retrieve for " + aggrStats.getPartsFound() + " partitions");
+            LOG.debug("Column stats requested for : " + partNames.size() + " partitions. "
+                + "Able to retrieve for " + aggrStats.getPartsFound() + " partitions");
             colState = State.PARTIAL;
           }
           stats.setColumnStatsState(colState);
         }
       }
-      // There are some partitions with no state (or we didn't fetch any state).
-      // Update the stats with empty list to reflect that in the state/initialize structures.
-      if (!haveFullStats) {
-        List<ColStatistics> emptyStats = Lists.<ColStatistics>newArrayList();
-        stats.addToColumnStats(emptyStats);
-        stats.updateColumnStatsState(deriveStatType(emptyStats, neededColumns));
-      }
     }
     return stats;
   }
 
+  private static void addParitionColumnStats(List<String> neededColumns,
+      List<String> referencedColumns, List<ColumnInfo> schema, Table table,
+      PrunedPartitionList partList, List<ColStatistics> colStats)
+      throws HiveException {
+
+    // extra columns is difference between referenced columns vs needed
+    // columns. The difference could be partition columns.
+    List<String> extraCols = Lists.newArrayList(referencedColumns);
+    if (referencedColumns.size() > neededColumns.size()) {
+      extraCols.removeAll(neededColumns);
+      for (String col : extraCols) {
+        for (ColumnInfo ci : schema) {
+          // conditions for being partition column
+          if (col.equals(ci.getInternalName()) && ci.getIsVirtualCol() &&
+              !ci.isHiddenVirtualCol()) {
+            // currently metastore does not store column stats for
+            // partition column, so we calculate the NDV from pruned
+            // partition list
+            ColStatistics partCS = new ColStatistics(table.getTableName(),
+                ci.getInternalName(), ci.getType().getTypeName());
+            long numPartitions = getNDVPartitionColumn(partList.getPartitions(),
+                ci.getInternalName());
+            partCS.setCountDistint(numPartitions);
+            colStats.add(partCS);
+          }
+        }
+      }
+    }
+  }
+
+  public static int getNDVPartitionColumn(Set<Partition> partitions, String partColName) {
+    Set<String> distinctVals = new HashSet<String>(partitions.size());
+    for (Partition partition : partitions) {
+      distinctVals.add(partition.getSpec().get(partColName));
+    }
+    return distinctVals.size();
+  }
+
   private static void setUnknownRcDsToAverage(
       List<Long> rowCounts, List<Long> dataSizes, int avgRowSize) {
     if (LOG.isDebugEnabled()) {
@@ -751,7 +820,8 @@ public class StatsUtils {
         || colType.equalsIgnoreCase(serdeConstants.FLOAT_TYPE_NAME)) {
       return JavaDataModel.get().primitive1();
     } else if (colType.equalsIgnoreCase(serdeConstants.DOUBLE_TYPE_NAME)
-        || colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)) {
+        || colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)
+        || colType.equalsIgnoreCase("long")) {
       return JavaDataModel.get().primitive2();
     } else if (colType.equalsIgnoreCase(serdeConstants.TIMESTAMP_TYPE_NAME)) {
       return JavaDataModel.get().lengthOfTimestamp();
@@ -780,7 +850,8 @@ public class StatsUtils {
       return JavaDataModel.get().lengthForIntArrayOfSize(length);
     } else if (colType.equalsIgnoreCase(serdeConstants.DOUBLE_TYPE_NAME)) {
       return JavaDataModel.get().lengthForDoubleArrayOfSize(length);
-    } else if (colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)) {
+    } else if (colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)
+        || colType.equalsIgnoreCase("long")) {
       return JavaDataModel.get().lengthForLongArrayOfSize(length);
     } else if (colType.equalsIgnoreCase(serdeConstants.BINARY_TYPE_NAME)) {
       return JavaDataModel.get().lengthForByteArrayOfSize(length);
@@ -876,7 +947,7 @@ public class StatsUtils {
       Statistics parentStats, Map<String, ExprNodeDesc> colExprMap, RowSchema rowSchema) {
 
     List<ColStatistics> cs = Lists.newArrayList();
-    if (colExprMap != null) {
+    if (colExprMap != null  && rowSchema != null) {
       for (ColumnInfo ci : rowSchema.getSignature()) {
         String outColName = ci.getInternalName();
         outColName = StatsUtils.stripPrefixFromColumnName(outColName);
@@ -1042,10 +1113,8 @@ public class StatsUtils {
 
   /**
    * Get basic stats of table
-   * @param dbName
-   *          - database name
-   * @param tabName
-   *          - table name
+   * @param table
+   *          - table
    * @param statType
    *          - type of stats
    * @return value of stats
@@ -1283,4 +1352,11 @@ public class StatsUtils {
     }
 
   }
+
+  public static long getAvailableMemory(Configuration conf) {
+    int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ?
+        HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
+        conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
+    return memory;
+  }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java Tue Oct 14 19:06:45 2014
@@ -24,15 +24,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnListImpl;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockLevel;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -41,7 +38,12 @@ import org.apache.hadoop.util.StringUtil
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A class to clean directories after compactions.  This will run in a separate thread.
@@ -50,35 +52,85 @@ public class Cleaner extends CompactorTh
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Log LOG = LogFactory.getLog(CLASS_NAME);
 
-  private long cleanerCheckInterval = 5000;
+  private long cleanerCheckInterval = 0;
+
+  // List of compactions to clean.
+  private Map<Long, Set<Long>> compactId2LockMap = new HashMap<Long, Set<Long>>();
+  private Map<Long, CompactionInfo> compactId2CompactInfoMap = new HashMap<Long, CompactionInfo>();
 
   @Override
   public void run() {
-    // Make sure nothing escapes this run method and kills the metastore at large,
-    // so wrap it in a big catch Throwable statement.
+    if (cleanerCheckInterval == 0) {
+      cleanerCheckInterval = conf.getTimeVar(
+          HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+    }
+
     do {
+      // This is solely for testing.  It checks if the test has set the looped value to false,
+      // and if so remembers that and then sets it to true at the end.  We have to check here
+      // first to make sure we go through a complete iteration of the loop before resetting it.
+      boolean setLooped = !looped.boolVal;
+      // Make sure nothing escapes this run method and kills the metastore at large,
+      // so wrap it in a big catch Throwable statement.
       try {
         long startedAt = System.currentTimeMillis();
 
-        // Now look for new entries ready to be cleaned.
+        // First look for all the compactions that are waiting to be cleaned.  If we have not
+        // seen an entry before, look for all the locks held on that table or partition and
+        // record them.  We will then only clean the partition once all of those locks have been
+        // released.  This way we avoid removing the files while they are in use,
+        // while at the same time avoiding starving the cleaner as new readers come along.
+        // This works because we know that any reader who comes along after the worker thread has
+        // done the compaction will read the more up to date version of the data (either in a
+        // newer delta or in a newer base).
         List<CompactionInfo> toClean = txnHandler.findReadyToClean();
-        for (CompactionInfo ci : toClean) {
-          LockComponent comp = null;
-          comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, ci.dbname);
-          comp.setTablename(ci.tableName);
-          if (ci.partName != null)  comp.setPartitionname(ci.partName);
-          List<LockComponent> components = new ArrayList<LockComponent>(1);
-          components.add(comp);
-          LockRequest rqst = new LockRequest(components, System.getProperty("user.name"),
-              Worker.hostname());
-          LockResponse rsp = txnHandler.lockNoWait(rqst);
+        if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
+          ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());
+
+          for (CompactionInfo ci : toClean) {
+            // Check to see if we have seen this request before.  If so, ignore it.  If not,
+            // add it to our queue.
+            if (!compactId2LockMap.containsKey(ci.id)) {
+              compactId2LockMap.put(ci.id, findRelatedLocks(ci, locksResponse));
+              compactId2CompactInfoMap.put(ci.id, ci);
+            }
+          }
+
+          // Now, for each entry in the queue, see if all of the associated locks are clear so we
+          // can clean
+          Set<Long> currentLocks = buildCurrentLockSet(locksResponse);
+          List<Long> expiredLocks = new ArrayList<Long>();
+          List<Long> compactionsCleaned = new ArrayList<Long>();
           try {
-            if (rsp.getState() == LockState.ACQUIRED) {
-              clean(ci);
+            for (Map.Entry<Long, Set<Long>> queueEntry : compactId2LockMap.entrySet()) {
+              boolean sawLock = false;
+              for (Long lockId : queueEntry.getValue()) {
+                if (currentLocks.contains(lockId)) {
+                  sawLock = true;
+                  break;
+                } else {
+                  expiredLocks.add(lockId);
+                }
+              }
+
+              if (!sawLock) {
+                // Remember to remove this when we're out of the loop,
+                // we can't do it in the loop or we'll get a concurrent modification exception.
+                compactionsCleaned.add(queueEntry.getKey());
+                clean(compactId2CompactInfoMap.get(queueEntry.getKey()));
+              } else {
+                // Remove the locks we didn't see so we don't look for them again next time
+                for (Long lockId : expiredLocks) {
+                  queueEntry.getValue().remove(lockId);
+                }
+              }
             }
           } finally {
-            if (rsp.getState() == LockState.ACQUIRED) {
-              txnHandler.unlock(new UnlockRequest(rsp.getLockid()));
+            if (compactionsCleaned.size() > 0) {
+              for (Long compactId : compactionsCleaned) {
+                compactId2LockMap.remove(compactId);
+                compactId2CompactInfoMap.remove(compactId);
+              }
             }
           }
         }
@@ -91,9 +143,37 @@ public class Cleaner extends CompactorTh
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
             StringUtils.stringifyException(t));
       }
+      if (setLooped) {
+        looped.boolVal = true;
+      }
     } while (!stop.boolVal);
   }
 
+  private Set<Long> findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) {
+    Set<Long> relatedLocks = new HashSet<Long>();
+    for (ShowLocksResponseElement lock : locksResponse.getLocks()) {
+      if (ci.dbname.equals(lock.getDbname())) {
+        if ((ci.tableName == null && lock.getTablename() == null) ||
+            (ci.tableName != null && ci.tableName.equals(lock.getTablename()))) {
+          if ((ci.partName == null && lock.getPartname() == null) ||
+              (ci.partName != null && ci.partName.equals(lock.getPartname()))) {
+            relatedLocks.add(lock.getLockid());
+          }
+        }
+      }
+    }
+
+    return relatedLocks;
+  }
+
+  private Set<Long> buildCurrentLockSet(ShowLocksResponse locksResponse) {
+    Set<Long> currentLocks = new HashSet<Long>(locksResponse.getLocks().size());
+    for (ShowLocksResponseElement lock : locksResponse.getLocks()) {
+      currentLocks.add(lock.getLockid());
+    }
+    return currentLocks;
+  }
+
   private void clean(CompactionInfo ci) throws MetaException {
     LOG.info("Starting cleaning for " + ci.getFullPartitionName());
     try {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java Tue Oct 14 19:06:45 2014
@@ -506,13 +506,15 @@ public class CompactorMR {
       ValidTxnList txnList =
           new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
 
+      boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
       AcidInputFormat.RawReader<V> reader =
-          aif.getRawReader(jobConf, jobConf.getBoolean(IS_MAJOR, false), split.getBucket(),
+          aif.getRawReader(jobConf, isMajor, split.getBucket(),
               txnList, split.getBaseDir(), split.getDeltaDirs());
       RecordIdentifier identifier = reader.createKey();
       V value = reader.createValue();
       getWriter(reporter, reader.getObjectInspector(), split.getBucket());
       while (reader.next(identifier, value)) {
+        if (isMajor && reader.isDelete(value)) continue;
         writer.write(value);
         reporter.progress();
       }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java Tue Oct 14 19:06:45 2014
@@ -53,6 +53,7 @@ abstract class CompactorThread extends T
   protected RawStore rs;
   protected int threadId;
   protected BooleanPointer stop;
+  protected BooleanPointer looped;
 
   @Override
   public void setHiveConf(HiveConf conf) {
@@ -66,8 +67,9 @@ abstract class CompactorThread extends T
   }
 
   @Override
-  public void init(BooleanPointer stop) throws MetaException {
+  public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException {
     this.stop = stop;
+    this.looped = looped;
     setPriority(MIN_PRIORITY);
     setDaemon(true); // this means the process will exit without waiting for this thread
 

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Tue Oct 14 19:06:45 2014
@@ -76,7 +76,7 @@ public class Initiator extends Compactor
         // don't doom the entire thread.
         try {
           ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
-          ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns());
+          ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0);
           Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
           LOG.debug("Found " + potentials.size() + " potential compactions, " +
               "checking to see if we should compact any of them");
@@ -137,8 +137,8 @@ public class Initiator extends Compactor
   }
 
   @Override
-  public void init(BooleanPointer stop) throws MetaException {
-    super.init(stop);
+  public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException {
+    super.init(stop, looped);
     checkInterval =
         conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ;
   }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java Tue Oct 14 19:06:45 2014
@@ -120,7 +120,7 @@ public class Worker extends CompactorThr
 
         final boolean isMajor = ci.isMajorCompaction();
         final ValidTxnList txns =
-            TxnHandler.createValidTxnList(txnHandler.getOpenTxns());
+            TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0);
         final StringBuffer jobName = new StringBuffer(name);
         jobName.append("-compactor-");
         jobName.append(ci.getFullPartitionName());
@@ -168,8 +168,8 @@ public class Worker extends CompactorThr
   }
 
   @Override
-  public void init(BooleanPointer stop) throws MetaException {
-    super.init(stop);
+  public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException {
+    super.init(stop, looped);
 
     StringBuilder name = new StringBuilder(hostname());
     name.append("-");

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFLog.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFLog.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFLog.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFLog.java Tue Oct 14 19:06:45 2014
@@ -52,29 +52,6 @@ public class UDFLog extends UDFMath {
   }
 
   /**
-   * Get the logarithm of the given decimal with the given base.
-   */
-  public DoubleWritable evaluate(DoubleWritable base, HiveDecimalWritable writable) {
-    if (base == null || writable == null) {
-      return null;
-    }
-    double d = writable.getHiveDecimal().bigDecimalValue().doubleValue();
-    return log(base.get(), d);
-  }
-
-  /**
-   * Get the logarithm of input with the given decimal as the base.
-   */
-  public DoubleWritable evaluate(HiveDecimalWritable base, DoubleWritable d) {
-    if (base == null || d == null) {
-      return null;
-    }
-
-    double b = base.getHiveDecimal().bigDecimalValue().doubleValue();
-    return log(b, d.get());
-  }
-
-  /**
    * Get the logarithm of the given decimal input with the given decimal base.
    */
   public DoubleWritable evaluate(HiveDecimalWritable baseWritable, HiveDecimalWritable writable) {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseNumeric.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseNumeric.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseNumeric.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseNumeric.java Tue Oct 14 19:06:45 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.No
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveGrouping;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -318,4 +320,17 @@ public abstract class GenericUDFBaseNume
   public void setAnsiSqlArithmetic(boolean ansiSqlArithmetic) {
     this.ansiSqlArithmetic = ansiSqlArithmetic;
   }
+
+  public PrimitiveTypeInfo deriveMinArgumentCast(
+      ExprNodeDesc childExpr, TypeInfo targetType) {
+    assert targetType instanceof PrimitiveTypeInfo : "Not a primitive type" + targetType;
+    PrimitiveTypeInfo pti = (PrimitiveTypeInfo)targetType;
+    // We only do the minimum cast for decimals. Other types are assumed safe; fix if needed.
+    // We also don't do anything for non-primitive children (maybe we should assert).
+    if ((pti.getPrimitiveCategory() != PrimitiveCategory.DECIMAL)
+        || (!(childExpr.getTypeInfo() instanceof PrimitiveTypeInfo))) return pti;
+    PrimitiveTypeInfo childTi = (PrimitiveTypeInfo)childExpr.getTypeInfo();
+    // If the child is also decimal, no cast is needed (we hope - can target type be narrower?).
+    return HiveDecimalUtils.getDecimalTypeForPrimitiveCategory(childTi);
+  }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFromUtcTimestamp.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFromUtcTimestamp.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFromUtcTimestamp.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFromUtcTimestamp.java Tue Oct 14 19:06:45 2014
@@ -22,6 +22,7 @@ import java.util.TimeZone;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -33,7 +34,9 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.io.Text;
 
-
+@Description(name = "from_utc_timestamp",
+             value = "from_utc_timestamp(timestamp, string timezone) - "
+                     + "Assumes given timestamp ist UTC and converts to given timezone (as of Hive 0.8.0)")
 public class GenericUDFFromUtcTimestamp extends GenericUDF {
 
   static final Log LOG = LogFactory.getLog(GenericUDFFromUtcTimestamp.class);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIf.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIf.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIf.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIf.java Tue Oct 14 19:06:45 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.udf.generic;
 
+import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
@@ -62,6 +63,11 @@ import org.apache.hadoop.hive.ql.exec.ve
  * otherwise it returns expr3. IF() returns a numeric or string value, depending
  * on the context in which it is used.
  */
+@Description(
+    name = "if",
+    value = "IF(expr1,expr2,expr3) - If expr1 is TRUE (expr1 <> 0 and expr1 <> NULL) then"
+    + " IF() returns expr2; otherwise it returns expr3. IF() returns a numeric or string value,"
+    + " depending on the context in which it is used.")
 @VectorizedExpressions({
   IfExprLongColumnLongColumn.class, IfExprDoubleColumnDoubleColumn.class,
   IfExprLongColumnLongScalar.class, IfExprDoubleColumnDoubleScalar.class,

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java Tue Oct 14 19:06:45 2014
@@ -26,9 +26,11 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
 
 /**
  * GenericUDFIndex.
@@ -36,11 +38,10 @@ import org.apache.hadoop.hive.serde2.obj
  */
 @Description(name = "index", value = "_FUNC_(a, n) - Returns the n-th element of a ")
 public class GenericUDFIndex extends GenericUDF {
+
   private transient MapObjectInspector mapOI;
-  private boolean mapKeyPreferWritable;
   private transient ListObjectInspector listOI;
-  private transient PrimitiveObjectInspector indexOI;
-  private transient ObjectInspector returnOI;
+  private transient ObjectInspectorConverters.Converter converter;
 
   @Override
   public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
@@ -66,21 +67,22 @@ public class GenericUDFIndex extends Gen
     }
 
     // index has to be a primitive
-    if (arguments[1] instanceof PrimitiveObjectInspector) {
-      indexOI = (PrimitiveObjectInspector) arguments[1];
-    } else {
+    if (!(arguments[1] instanceof PrimitiveObjectInspector)) {
       throw new UDFArgumentTypeException(1, "Primitive Type is expected but "
           + arguments[1].getTypeName() + "\" is found");
     }
-
+    PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) arguments[1];
+    ObjectInspector returnOI;
+    ObjectInspector indexOI;
     if (mapOI != null) {
+      indexOI = ObjectInspectorConverters.getConvertedOI(
+          inputOI, mapOI.getMapKeyObjectInspector());
       returnOI = mapOI.getMapValueObjectInspector();
-      ObjectInspector keyOI = mapOI.getMapKeyObjectInspector();
-      mapKeyPreferWritable = ((PrimitiveObjectInspector) keyOI)
-          .preferWritable();
     } else {
+      indexOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
       returnOI = listOI.getListElementObjectInspector();
     }
+    converter = ObjectInspectorConverters.getConverter(inputOI, indexOI);
 
     return returnOI;
   }
@@ -88,35 +90,16 @@ public class GenericUDFIndex extends Gen
   @Override
   public Object evaluate(DeferredObject[] arguments) throws HiveException {
     assert (arguments.length == 2);
-    Object main = arguments[0].get();
     Object index = arguments[1].get();
 
+    Object indexObject = converter.convert(index);
+    if (indexObject == null) {
+      return null;
+    }
     if (mapOI != null) {
-
-      Object indexObject;
-      if (mapKeyPreferWritable) {
-        indexObject = indexOI.getPrimitiveWritableObject(index);
-      } else {
-        indexObject = indexOI.getPrimitiveJavaObject(index);
-      }
-      return mapOI.getMapValueElement(main, indexObject);
-
-    } else {
-
-      assert (listOI != null);
-      int intIndex = 0;
-      try {
-        intIndex = PrimitiveObjectInspectorUtils.getInt(index, indexOI);
-      } catch (NullPointerException e) {
-        // If index is null, we should return null.
-        return null;
-      } catch (NumberFormatException e) {
-        // If index is not a number, we should return null.
-        return null;
-      }
-      return listOI.getListElement(main, intIndex);
-
+      return mapOI.getMapValueElement(arguments[0].get(), indexObject);
     }
+    return listOI.getListElement(arguments[0].get(), ((IntWritable)indexObject).get());
   }
 
   @Override

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFTimestamp.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFTimestamp.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFTimestamp.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFTimestamp.java Tue Oct 14 19:06:45 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.udf.generic;
 
+import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedExpressions;
@@ -39,6 +40,8 @@ import org.apache.hadoop.hive.serde2.obj
  * Creates a TimestampWritable object using PrimitiveObjectInspectorConverter
  *
  */
+@Description(name = "timestamp",
+value = "cast(date as timestamp) - Returns timestamp")
 @VectorizedExpressions({CastLongToTimestampViaLongToLong.class,
   CastDoubleToTimestampViaDoubleToLong.class, CastDecimalToTimestamp.class})
 public class GenericUDFTimestamp extends GenericUDF {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToUtcTimestamp.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToUtcTimestamp.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToUtcTimestamp.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToUtcTimestamp.java Tue Oct 14 19:06:45 2014
@@ -17,7 +17,11 @@
  */
 package org.apache.hadoop.hive.ql.udf.generic;
 
+import org.apache.hadoop.hive.ql.exec.Description;
 
+@Description(name = "to_utc_timestamp",
+             value = "to_utc_timestamp(timestamp, string timezone) - "
+                     + "Assumes given timestamp is in given timezone and converts to UTC (as of Hive 0.8.0)")
 public class GenericUDFToUtcTimestamp extends
     GenericUDFFromUtcTimestamp {
 

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFunctionRegistry.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFunctionRegistry.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFunctionRegistry.java Tue Oct 14 19:06:45 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -80,7 +79,7 @@ public class TestFunctionRegistry extend
   }
 
   private void implicit(TypeInfo a, TypeInfo b, boolean convertible) {
-    assertEquals(convertible, FunctionRegistry.implicitConvertable(a,b));
+    assertEquals(convertible, FunctionRegistry.implicitConvertible(a, b));
   }
 
   public void testImplicitConversion() {

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java Tue Oct 14 19:06:45 2014
@@ -18,17 +18,24 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
@@ -42,6 +49,10 @@ import org.apache.hadoop.hive.ql.plan.Pl
 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.processors.CommandProcessor;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -49,8 +60,14 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.junit.Test;
 
 /**
  * TestOperators.
@@ -274,7 +291,7 @@ public class TestOperators extends TestC
           cd, sop);
 
       op.initialize(new JobConf(TestOperators.class),
-          new ObjectInspector[] {r[0].oi});
+          new ObjectInspector[]{r[0].oi});
 
       // evaluate on row
       for (int i = 0; i < 5; i++) {
@@ -314,7 +331,8 @@ public class TestOperators extends TestC
       Configuration hconf = new JobConf(TestOperators.class);
       HiveConf.setVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME,
           "hdfs:///testDir/testFile");
-      IOContext.get().setInputPath(new Path("hdfs:///testDir/testFile"));
+      IOContext.get(hconf.get(Utilities.INPUT_NAME)).setInputPath(
+          new Path("hdfs:///testDir/testFile"));
 
       // initialize pathToAliases
       ArrayList<String> aliases = new ArrayList<String>();
@@ -379,4 +397,82 @@ public class TestOperators extends TestC
       throw (e);
     }
   }
+
+  @Test
+  public void testFetchOperatorContextQuoting() throws Exception {
+    JobConf conf = new JobConf();
+    ArrayList<Path> list = new ArrayList<Path>();
+    list.add(new Path("hdfs://nn.example.com/fi\tl\\e\t1"));
+    list.add(new Path("hdfs://nn.example.com/file\t2"));
+    list.add(new Path("file:/file3"));
+    FetchOperator.setFetchOperatorContext(conf, list);
+    String[] parts =
+        conf.get(FetchOperator.FETCH_OPERATOR_DIRECTORY_LIST).split("\t");
+    assertEquals(3, parts.length);
+    assertEquals("hdfs://nn.example.com/fi\\tl\\\\e\\t1", parts[0]);
+    assertEquals("hdfs://nn.example.com/file\\t2", parts[1]);
+    assertEquals("file:/file3", parts[2]);
+  }
+
+  /**
+   * A custom input format that checks to make sure that the fetch operator
+   * sets the required attributes.
+   */
+  public static class CustomInFmt extends TextInputFormat {
+
+    @Override
+    public InputSplit[] getSplits(JobConf job, int splits) throws IOException {
+
+      // ensure that the table properties were copied
+      assertEquals("val1", job.get("myprop1"));
+      assertEquals("val2", job.get("myprop2"));
+
+      // ensure that both of the partitions are in the complete list.
+      String[] dirs = job.get("hive.complete.dir.list").split("\t");
+      assertEquals(2, dirs.length);
+      assertEquals(true, dirs[0].endsWith("/state=CA"));
+      assertEquals(true, dirs[1].endsWith("/state=OR"));
+      return super.getSplits(job, splits);
+    }
+  }
+
+  @Test
+  public void testFetchOperatorContext() throws Exception {
+    HiveConf conf = new HiveConf();
+    conf.set("hive.support.concurrency", "false");
+    SessionState.start(conf);
+    String cmd = "create table fetchOp (id int, name string) " +
+        "partitioned by (state string) " +
+        "row format delimited fields terminated by '|' " +
+        "stored as " +
+        "inputformat 'org.apache.hadoop.hive.ql.exec.TestOperators$CustomInFmt' " +
+        "outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' " +
+        "tblproperties ('myprop1'='val1', 'myprop2' = 'val2')";
+    Driver driver = new Driver();
+    driver.init();
+    CommandProcessorResponse response = driver.run(cmd);
+    assertEquals(0, response.getResponseCode());
+    List<Object> result = new ArrayList<Object>();
+
+    cmd = "load data local inpath '../data/files/employee.dat' " +
+        "overwrite into table fetchOp partition (state='CA')";
+    driver.init();
+    response = driver.run(cmd);
+    assertEquals(0, response.getResponseCode());
+
+    cmd = "load data local inpath '../data/files/employee2.dat' " +
+        "overwrite into table fetchOp partition (state='OR')";
+    driver.init();
+    response = driver.run(cmd);
+    assertEquals(0, response.getResponseCode());
+
+    cmd = "select * from fetchOp";
+    driver.init();
+    driver.setMaxRows(500);
+    response = driver.run(cmd);
+    assertEquals(0, response.getResponseCode());
+    driver.getResults(result);
+    assertEquals(20, result.size());
+    driver.close();
+  }
 }

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java Tue Oct 14 19:06:45 2014
@@ -26,6 +26,7 @@ import java.util.Random;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.apache.hadoop.hive.conf.HiveConf;
 
 public class TestTezSessionPool {
@@ -157,4 +158,29 @@ public class TestTezSessionPool {
         }
       }
     }
+
+  @Test
+  public void testCloseAndOpenDefault() throws Exception {
+    poolManager = new TestTezSessionPoolManager();
+    TezSessionState session = Mockito.mock(TezSessionState.class);
+    Mockito.when(session.isDefault()).thenReturn(false);
+
+    poolManager.closeAndOpen(session, conf, false);
+
+    Mockito.verify(session).close(false);
+    Mockito.verify(session).open(conf, null);
+  }
+
+  @Test
+  public void testCloseAndOpenWithResources() throws Exception {
+    poolManager = new TestTezSessionPoolManager();
+    TezSessionState session = Mockito.mock(TezSessionState.class);
+    Mockito.when(session.isDefault()).thenReturn(false);
+    String[] extraResources = new String[] { "file:///tmp/foo.jar" };
+
+    poolManager.closeAndOpen(session, conf, extraResources, false);
+
+    Mockito.verify(session).close(false);
+    Mockito.verify(session).open(conf, extraResources);
+  }
 }

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Tue Oct 14 19:06:45 2014
@@ -30,9 +30,11 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.Re
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.mapred.JobConf;
@@ -90,8 +93,11 @@ public class TestTezTask {
     path = mock(Path.class);
     when(path.getFileSystem(any(Configuration.class))).thenReturn(fs);
     when(utils.getTezDir(any(Path.class))).thenReturn(path);
-    when(utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class), any(LocalResource.class),
-        any(List.class), any(FileSystem.class), any(Context.class), anyBoolean(), any(TezWork.class))).thenAnswer(new Answer<Vertex>() {
+    when(
+        utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class),
+            any(LocalResource.class), any(List.class), any(FileSystem.class), any(Context.class),
+            anyBoolean(), any(TezWork.class), any(VertexType.class))).thenAnswer(
+        new Answer<Vertex>() {
 
           @Override
           public Vertex answer(InvocationOnMock invocation) throws Throwable {
@@ -101,8 +107,8 @@ public class TestTezTask {
           }
         });
 
-    when(utils.createEdge(any(JobConf.class), any(Vertex.class),
-        any(Vertex.class), any(TezEdgeProperty.class))).thenAnswer(new Answer<Edge>() {
+    when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(Vertex.class),
+            any(TezEdgeProperty.class), any(VertexType.class))).thenAnswer(new Answer<Edge>() {
 
           @Override
           public Edge answer(InvocationOnMock invocation) throws Throwable {
@@ -204,10 +210,11 @@ public class TestTezTask {
   @Test
   public void testSubmit() throws Exception {
     DAG dag = DAG.create("test");
-    task.submit(conf, dag, path, appLr, sessionState, new LinkedList());
+    task.submit(conf, dag, path, appLr, sessionState, Collections.<LocalResource> emptyList(),
+        new String[0], Collections.<String,LocalResource> emptyMap());
     // validate close/reopen
-    verify(sessionState, times(1)).open(any(HiveConf.class));
-    verify(sessionState, times(1)).close(eq(false));  // now uses pool after HIVE-7043
+    verify(sessionState, times(1)).open(any(HiveConf.class), any(String[].class));
+    verify(sessionState, times(1)).close(eq(true)); // now uses pool after HIVE-7043
     verify(session, times(2)).submitDAG(any(DAG.class));
   }
 
@@ -216,4 +223,54 @@ public class TestTezTask {
     task.close(work, 0);
     verify(op, times(4)).jobClose(any(Configuration.class), eq(true));
   }
+
+  @Test
+  public void testExistingSessionGetsStorageHandlerResources() throws Exception {
+    final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+    LocalResource res = mock(LocalResource.class);
+    final List<LocalResource> resources = Collections.singletonList(res);
+    final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
+    resMap.put("foo.jar", res);
+
+    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+        .thenReturn(resources);
+    when(utils.getBaseName(res)).thenReturn("foo.jar");
+    when(sessionState.isOpen()).thenReturn(true);
+    when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
+    task.updateSession(sessionState, conf, path, inputOutputJars, resMap);
+    verify(session).addAppMasterLocalFiles(resMap);
+  }
+
+  @Test
+  public void testExtraResourcesAddedToDag() throws Exception {
+    final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+    LocalResource res = mock(LocalResource.class);
+    final List<LocalResource> resources = Collections.singletonList(res);
+    final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
+    resMap.put("foo.jar", res);
+    DAG dag = mock(DAG.class);
+
+    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+        .thenReturn(resources);
+    when(utils.getBaseName(res)).thenReturn("foo.jar");
+    when(sessionState.isOpen()).thenReturn(true);
+    when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
+    task.addExtraResourcesToDag(sessionState, dag, inputOutputJars, resMap);
+    verify(dag).addTaskLocalFiles(resMap);
+  }
+
+  @Test
+  public void testGetExtraLocalResources() throws Exception {
+    final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+    LocalResource res = mock(LocalResource.class);
+    final List<LocalResource> resources = Collections.singletonList(res);
+    final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
+    resMap.put("foo.jar", res);
+
+    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+        .thenReturn(resources);
+    when(utils.getBaseName(res)).thenReturn("foo.jar");
+
+    assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars));
+  }
 }

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java Tue Oct 14 19:06:45 2014
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.Ag
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -601,6 +602,30 @@ public class TestVectorGroupByOperator {
   }
 
   @Test
+  public void testCountReduce() throws HiveException {
+    testAggregateCountReduce(
+            2,
+            Arrays.asList(new Long[]{}),
+            0L);
+    testAggregateCountReduce(
+            2,
+            Arrays.asList(new Long[]{0L}),
+            0L);
+    testAggregateCountReduce(
+            2,
+            Arrays.asList(new Long[]{0L,0L}),
+            0L);
+    testAggregateCountReduce(
+            2,
+            Arrays.asList(new Long[]{0L,1L,0L}),
+            1L);
+    testAggregateCountReduce(
+        2,
+        Arrays.asList(new Long[]{13L,0L,7L,19L}),
+        39L);
+  }
+
+  @Test
   public void testCountDecimal() throws HiveException {
     testAggregateDecimal(
         "Decimal",
@@ -1210,7 +1235,7 @@ public class TestVectorGroupByOperator {
         "count",
         2,
         Arrays.asList(new Long[]{}),
-        null);
+        0L);
   }
 
   @Test
@@ -2027,6 +2052,17 @@ public class TestVectorGroupByOperator {
     testAggregateCountStarIterable (fdr, expected);
   }
 
+  public void testAggregateCountReduce (
+      int batchSize,
+      Iterable<Long> values,
+      Object expected) throws HiveException {
+
+    @SuppressWarnings("unchecked")
+    FakeVectorRowBatchFromLongIterables fdr = new FakeVectorRowBatchFromLongIterables(batchSize,
+        values);
+    testAggregateCountReduceIterable (fdr, expected);
+  }
+
 
   public static interface Validator {
     void validate (String key, Object expected, Object result);
@@ -2223,6 +2259,37 @@ public class TestVectorGroupByOperator {
     validator.validate("_total", expected, result);
   }
 
+  public void testAggregateCountReduceIterable (
+      Iterable<VectorizedRowBatch> data,
+      Object expected) throws HiveException {
+    Map<String, Integer> mapColumnNames = new HashMap<String, Integer>();
+    mapColumnNames.put("A", 0);
+    VectorizationContext ctx = new VectorizationContext(mapColumnNames, 1);
+
+    GroupByDesc desc = buildGroupByDescType(ctx, "count", "A", TypeInfoFactory.longTypeInfo);
+    VectorGroupByDesc vectorDesc = desc.getVectorDesc();
+    vectorDesc.setIsReduce(true);
+
+    VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc);
+
+    FakeCaptureOutputOperator out = FakeCaptureOutputOperator.addCaptureOutputChild(vgo);
+    vgo.initialize(null, null);
+
+    for (VectorizedRowBatch unit: data) {
+      vgo.processOp(unit,  0);
+    }
+    vgo.close(false);
+
+    List<Object> outBatchList = out.getCapturedRows();
+    assertNotNull(outBatchList);
+    assertEquals(1, outBatchList.size());
+
+    Object result = outBatchList.get(0);
+
+    Validator validator = getValidator("count");
+    validator.validate("_total", expected, result);
+  }
+
   public void testAggregateStringIterable (
       String aggregateName,
       Iterable<VectorizedRowBatch> data,

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/StorageFormats.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/StorageFormats.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/StorageFormats.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/StorageFormats.java Tue Oct 14 19:06:45 2014
@@ -90,7 +90,9 @@ public class StorageFormats {
    * includes both native Hive storage formats as well as those enumerated in the
    * ADDITIONAL_STORAGE_FORMATS table.
    *
-   * @return List of storage format as paramters.
+   * @return List of storage format as a Collection of Object arrays, each containing (in order):
+   *         Storage format name, SerDe class name, InputFormat class name, OutputFormat class name.
+   *         This list is used as the parameters to JUnit parameterized tests.
    */
   public static Collection<Object[]> asParameters() {
     List<Object[]> parameters = new ArrayList<Object[]>();
@@ -130,5 +132,21 @@ public class StorageFormats {
 
     return parameters;
   }
+
+  /**
+   * Returns a list of the names of storage formats.
+   *
+   * @return List of names of storage formats.
+   */
+  public static Collection<Object[]> names() {
+    List<Object[]> names = new ArrayList<Object[]>();
+    for (StorageFormatDescriptor descriptor : ServiceLoader.load(StorageFormatDescriptor.class)) {
+      String[] formatNames = new String[descriptor.getNames().size()];
+      formatNames = descriptor.getNames().toArray(formatNames);
+      String[] params = { formatNames[0] };
+      names.add(params);
+    }
+    return names;
+  }
 }
 

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java Tue Oct 14 19:06:45 2014
@@ -217,11 +217,11 @@ public class TestAcidUtils {
     Path part = new MockPath(fs, "/tbl/part1");
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("150:"));
+    // The two original buckets won't be in the obsolete list because we don't look at those
+    // until we have determined there is no base.
     List<FileStatus> obsolete = dir.getObsolete();
-    assertEquals(3, obsolete.size());
+    assertEquals(1, obsolete.size());
     assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).getPath().toString());
-    assertEquals("mock:/tbl/part1/000000_0", obsolete.get(1).getPath().toString());
-    assertEquals("mock:/tbl/part1/000001_1", obsolete.get(2).getPath().toString());
     assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString());
   }
 

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java Tue Oct 14 19:06:45 2014
@@ -115,7 +115,8 @@ public class TestHiveBinarySearchRecordR
   }
 
   private void resetIOContext() {
-    ioContext = IOContext.get();
+    conf.set(Utilities.INPUT_NAME, "TestHiveBinarySearchRecordReader");
+    ioContext = IOContext.get(conf.get(Utilities.INPUT_NAME));
     ioContext.setUseSorted(false);
     ioContext.setIsBinarySearching(false);
     ioContext.setEndBinarySearch(false);
@@ -124,6 +125,7 @@ public class TestHiveBinarySearchRecordR
   }
 
   private void init() throws IOException {
+    conf = new JobConf();
     resetIOContext();
     rcfReader = mock(RCFileRecordReader.class);
     when(rcfReader.next((LongWritable)anyObject(),
@@ -131,7 +133,6 @@ public class TestHiveBinarySearchRecordR
     // Since the start is 0, and the length is 100, the first call to sync should be with the value
     // 50 so return that for getPos()
     when(rcfReader.getPos()).thenReturn(50L);
-    conf = new JobConf();
     conf.setBoolean("hive.input.format.sorted", true);
 
     TableDesc tblDesc = Utilities.defaultTd;

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java Tue Oct 14 19:06:45 2014
@@ -165,7 +165,7 @@ public class TestSymlinkTextInputFormat 
             + " failed with exit code= " + ecode);
       }
 
-      String cmd = "select key from " + tblName;
+      String cmd = "select key*1 from " + tblName;
       drv.compile(cmd);
 
       //create scratch dir

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Tue Oct 14 19:06:45 2014
@@ -73,6 +73,7 @@ import org.apache.hadoop.hive.ql.io.sarg
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -118,7 +119,6 @@ public class TestInputOutputFormat {
     TimeZone gmt = TimeZone.getTimeZone("GMT+0");
     DATE_FORMAT.setTimeZone(gmt);
     TIME_FORMAT.setTimeZone(gmt);
-    TimeZone local = TimeZone.getDefault();
   }
 
   public static class BigRow implements Writable {
@@ -560,6 +560,12 @@ public class TestInputOutputFormat {
       this.file = file;
     }
 
+    /**
+     * Set the blocks and their location for the file.
+     * Must be called after the stream is closed or the block length will be
+     * wrong.
+     * @param blocks the list of blocks
+     */
     public void setBlocks(MockBlock... blocks) {
       file.blocks = blocks;
       int offset = 0;
@@ -580,12 +586,18 @@ public class TestInputOutputFormat {
       file.content = new byte[file.length];
       System.arraycopy(buf.getData(), 0, file.content, 0, file.length);
     }
+
+    @Override
+    public String toString() {
+      return "Out stream to " + file.toString();
+    }
   }
 
   public static class MockFileSystem extends FileSystem {
     final List<MockFile> files = new ArrayList<MockFile>();
     Path workingDir = new Path("/");
 
+    @SuppressWarnings("unused")
     public MockFileSystem() {
       // empty
     }
@@ -620,7 +632,7 @@ public class TestInputOutputFormat {
           return new FSDataInputStream(new MockInputStream(file));
         }
       }
-      return null;
+      throw new IOException("File not found: " + path);
     }
 
     @Override
@@ -743,8 +755,12 @@ public class TestInputOutputFormat {
           for(MockBlock block: file.blocks) {
             if (OrcInputFormat.SplitGenerator.getOverlap(block.offset,
                 block.length, start, len) > 0) {
+              String[] topology = new String[block.hosts.length];
+              for(int i=0; i < topology.length; ++i) {
+                topology[i] = "/rack/ " + block.hosts[i];
+              }
               result.add(new BlockLocation(block.hosts, block.hosts,
-                  block.offset, block.length));
+                  topology, block.offset, block.length));
             }
           }
           return result.toArray(new BlockLocation[result.size()]);
@@ -1209,7 +1225,8 @@ public class TestInputOutputFormat {
                                          Path warehouseDir,
                                          String tableName,
                                          ObjectInspector objectInspector,
-                                         boolean isVectorized
+                                         boolean isVectorized,
+                                         int partitions
                                          ) throws IOException {
     Utilities.clearWorkMap();
     JobConf conf = new JobConf();
@@ -1218,9 +1235,20 @@ public class TestInputOutputFormat {
     conf.set("hive.vectorized.execution.enabled", Boolean.toString(isVectorized));
     conf.set("fs.mock.impl", MockFileSystem.class.getName());
     conf.set("mapred.mapper.class", ExecMapper.class.getName());
-    Path root = new Path(warehouseDir, tableName + "/p=0");
+    Path root = new Path(warehouseDir, tableName);
+    // clean out previous contents
     ((MockFileSystem) root.getFileSystem(conf)).clear();
-    conf.set("mapred.input.dir", root.toString());
+    // build partition strings
+    String[] partPath = new String[partitions];
+    StringBuilder buffer = new StringBuilder();
+    for(int p=0; p < partitions; ++p) {
+      partPath[p] = new Path(root, "p=" + p).toString();
+      if (p != 0) {
+        buffer.append(',');
+      }
+      buffer.append(partPath[p]);
+    }
+    conf.set("mapred.input.dir", buffer.toString());
     StringBuilder columnIds = new StringBuilder();
     StringBuilder columnNames = new StringBuilder();
     StringBuilder columnTypes = new StringBuilder();
@@ -1239,6 +1267,8 @@ public class TestInputOutputFormat {
     }
     conf.set("hive.io.file.readcolumn.ids", columnIds.toString());
     conf.set("partition_columns", "p");
+    conf.set(serdeConstants.LIST_COLUMNS, columnNames.toString());
+    conf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
     MockFileSystem fs = (MockFileSystem) warehouseDir.getFileSystem(conf);
     fs.clear();
 
@@ -1249,9 +1279,6 @@ public class TestInputOutputFormat {
     tblProps.put("columns.types", columnTypes.toString());
     TableDesc tbl = new TableDesc(OrcInputFormat.class, OrcOutputFormat.class,
         tblProps);
-    LinkedHashMap<String, String> partSpec =
-        new LinkedHashMap<String, String>();
-    PartitionDesc part = new PartitionDesc(tbl, partSpec);
 
     MapWork mapWork = new MapWork();
     mapWork.setVectorMode(isVectorized);
@@ -1260,11 +1287,16 @@ public class TestInputOutputFormat {
         new LinkedHashMap<String, ArrayList<String>>();
     ArrayList<String> aliases = new ArrayList<String>();
     aliases.add(tableName);
-    aliasMap.put(root.toString(), aliases);
-    mapWork.setPathToAliases(aliasMap);
     LinkedHashMap<String, PartitionDesc> partMap =
         new LinkedHashMap<String, PartitionDesc>();
-    partMap.put(root.toString(), part);
+    for(int p=0; p < partitions; ++p) {
+      aliasMap.put(partPath[p], aliases);
+      LinkedHashMap<String, String> partSpec =
+          new LinkedHashMap<String, String>();
+      PartitionDesc part = new PartitionDesc(tbl, partSpec);
+      partMap.put(partPath[p], part);
+    }
+    mapWork.setPathToAliases(aliasMap);
     mapWork.setPathToPartitionInfo(partMap);
     mapWork.setScratchColumnMap(new HashMap<String, Map<String, Integer>>());
     mapWork.setScratchColumnVectorTypes(new HashMap<String,
@@ -1285,6 +1317,7 @@ public class TestInputOutputFormat {
    * @throws Exception
    */
   @Test
+  @SuppressWarnings("unchecked")
   public void testVectorization() throws Exception {
     // get the object inspector for MyRow
     StructObjectInspector inspector;
@@ -1294,7 +1327,7 @@ public class TestInputOutputFormat {
               ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
     JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
-        "vectorization", inspector, true);
+        "vectorization", inspector, true, 1);
 
     // write the orc file to the mock file system
     Writer writer =
@@ -1332,6 +1365,7 @@ public class TestInputOutputFormat {
    * @throws Exception
    */
   @Test
+  @SuppressWarnings("unchecked")
   public void testVectorizationWithBuckets() throws Exception {
     // get the object inspector for MyRow
     StructObjectInspector inspector;
@@ -1341,7 +1375,7 @@ public class TestInputOutputFormat {
               ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
     JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
-        "vectorBuckets", inspector, true);
+        "vectorBuckets", inspector, true, 1);
 
     // write the orc file to the mock file system
     Writer writer =
@@ -1377,10 +1411,11 @@ public class TestInputOutputFormat {
 
   // test acid with vectorization, no combine
   @Test
+  @SuppressWarnings("unchecked")
   public void testVectorizationWithAcid() throws Exception {
     StructObjectInspector inspector = new BigRowInspector();
     JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
-        "vectorizationAcid", inspector, true);
+        "vectorizationAcid", inspector, true, 1);
 
     // write the orc file to the mock file system
     Path partDir = new Path(conf.get("mapred.input.dir"));
@@ -1444,6 +1479,7 @@ public class TestInputOutputFormat {
 
   // test non-vectorized, non-acid, combine
   @Test
+  @SuppressWarnings("unchecked")
   public void testCombinationInputFormat() throws Exception {
     // get the object inspector for MyRow
     StructObjectInspector inspector;
@@ -1453,7 +1489,7 @@ public class TestInputOutputFormat {
               ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
     JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
-        "combination", inspector, false);
+        "combination", inspector, false, 1);
 
     // write the orc file to the mock file system
     Path partDir = new Path(conf.get("mapred.input.dir"));
@@ -1516,17 +1552,25 @@ public class TestInputOutputFormat {
   public void testCombinationInputFormatWithAcid() throws Exception {
     // get the object inspector for MyRow
     StructObjectInspector inspector;
+    final int PARTITIONS = 2;
+    final int BUCKETS = 3;
     synchronized (TestOrcFile.class) {
       inspector = (StructObjectInspector)
           ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class,
               ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
     JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
-        "combinationAcid", inspector, false);
+        "combinationAcid", inspector, false, PARTITIONS);
 
     // write the orc file to the mock file system
-    Path partDir = new Path(conf.get("mapred.input.dir"));
-    OrcRecordUpdater writer = new OrcRecordUpdater(partDir,
+    Path[] partDir = new Path[PARTITIONS];
+    String[] paths = conf.getStrings("mapred.input.dir");
+    for(int p=0; p < PARTITIONS; ++p) {
+      partDir[p] = new Path(paths[p]);
+    }
+
+    // write a base file in partition 0
+    OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0],
         new AcidOutputFormat.Options(conf).maximumTransactionId(10)
             .writingBase(true).bucket(0).inspector(inspector));
     for(int i=0; i < 10; ++i) {
@@ -1534,31 +1578,68 @@ public class TestInputOutputFormat {
     }
     WriterImpl baseWriter = (WriterImpl) writer.getWriter();
     writer.close(false);
+
     MockOutputStream outputStream = (MockOutputStream) baseWriter.getStream();
-    int length0 = outputStream.file.length;
-    writer = new OrcRecordUpdater(partDir,
+    outputStream.setBlocks(new MockBlock("host1", "host2"));
+
+    // write a delta file in partition 0
+    writer = new OrcRecordUpdater(partDir[0],
         new AcidOutputFormat.Options(conf).maximumTransactionId(10)
             .writingBase(true).bucket(1).inspector(inspector));
     for(int i=10; i < 20; ++i) {
       writer.insert(10, new MyRow(i, 2*i));
     }
-    baseWriter = (WriterImpl) writer.getWriter();
+    WriterImpl deltaWriter = (WriterImpl) writer.getWriter();
+    outputStream = (MockOutputStream) deltaWriter.getStream();
     writer.close(false);
-    outputStream = (MockOutputStream) baseWriter.getStream();
     outputStream.setBlocks(new MockBlock("host1", "host2"));
 
+    // write three files in partition 1
+    for(int bucket=0; bucket < BUCKETS; ++bucket) {
+      Writer orc = OrcFile.createWriter(
+          new Path(partDir[1], "00000" + bucket + "_0"),
+          OrcFile.writerOptions(conf)
+              .blockPadding(false)
+              .bufferSize(1024)
+              .inspector(inspector));
+      orc.addRow(new MyRow(1, 2));
+      outputStream = (MockOutputStream) ((WriterImpl) orc).getStream();
+      orc.close();
+      outputStream.setBlocks(new MockBlock("host3", "host4"));
+    }
+
     // call getsplits
+    conf.setInt(hive_metastoreConstants.BUCKET_COUNT, BUCKETS);
     HiveInputFormat<?,?> inputFormat =
         new CombineHiveInputFormat<WritableComparable, Writable>();
-    try {
-      InputSplit[] splits = inputFormat.getSplits(conf, 1);
-      assertTrue("shouldn't reach here", false);
-    } catch (IOException ioe) {
-      assertEquals("CombineHiveInputFormat is incompatible"
-          + "  with ACID tables. Please set hive.input.format=org.apache.hadoop"
-          + ".hive.ql.io.HiveInputFormat",
-          ioe.getMessage());
+    InputSplit[] splits = inputFormat.getSplits(conf, 1);
+    assertEquals(3, splits.length);
+    HiveInputFormat.HiveInputSplit split =
+        (HiveInputFormat.HiveInputSplit) splits[0];
+    assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+        split.inputFormatClassName());
+    assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00000",
+        split.getPath().toString());
+    assertEquals(0, split.getStart());
+    assertEquals(580, split.getLength());
+    split = (HiveInputFormat.HiveInputSplit) splits[1];
+    assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+        split.inputFormatClassName());
+    assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00001",
+        split.getPath().toString());
+    assertEquals(0, split.getStart());
+    assertEquals(601, split.getLength());
+    CombineHiveInputFormat.CombineHiveInputSplit combineSplit =
+        (CombineHiveInputFormat.CombineHiveInputSplit) splits[2];
+    assertEquals(BUCKETS, combineSplit.getNumPaths());
+    for(int bucket=0; bucket < BUCKETS; ++bucket) {
+      assertEquals("mock:/combinationAcid/p=1/00000" + bucket + "_0",
+          combineSplit.getPath(bucket).toString());
+      assertEquals(0, combineSplit.getOffset(bucket));
+      assertEquals(225, combineSplit.getLength(bucket));
     }
+    String[] hosts = combineSplit.getLocations();
+    assertEquals(2, hosts.length);
   }
 
   @Test