You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/10/14 21:07:05 UTC
svn commit: r1631841 [17/42] - in /hive/branches/llap: ./ accumulo-handler/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/
accumulo-handler/src/java/org/apache/hadoop/hiv...
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java Tue Oct 14 19:06:45 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -90,42 +91,11 @@ public class SQLStdHiveAccessController
HiveAuthenticationProvider authenticator, HiveAuthzSessionContext ctx) throws HiveAuthzPluginException {
this.metastoreClientFactory = metastoreClientFactory;
this.authenticator = authenticator;
- this.sessionCtx = applyTestSettings(ctx, conf);
-
- assertHiveCliAuthDisabled(conf);
- initUserRoles();
+ this.sessionCtx = SQLAuthorizationUtils.applyTestSettings(ctx, conf);
LOG.info("Created SQLStdHiveAccessController for session context : " + sessionCtx);
}
/**
- * Change the session context based on configuration to aid in testing of sql std auth
- * @param ctx
- * @param conf
- * @return
- */
- private HiveAuthzSessionContext applyTestSettings(HiveAuthzSessionContext ctx, HiveConf conf) {
- if(conf.getBoolVar(ConfVars.HIVE_TEST_AUTHORIZATION_SQLSTD_HS2_MODE) &&
- ctx.getClientType() == CLIENT_TYPE.HIVECLI
- ){
- // create new session ctx object with HS2 as client type
- HiveAuthzSessionContext.Builder ctxBuilder = new HiveAuthzSessionContext.Builder(ctx);
- ctxBuilder.setClientType(CLIENT_TYPE.HIVESERVER2);
- return ctxBuilder.build();
- }
- return ctx;
- }
-
- private void assertHiveCliAuthDisabled(HiveConf conf) throws HiveAuthzPluginException {
- if (sessionCtx.getClientType() == CLIENT_TYPE.HIVECLI
- && conf.getBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
- throw new HiveAuthzPluginException(
- "SQL standards based authorization should not be enabled from hive cli"
- + "Instead the use of storage based authorization in hive metastore is reccomended. Set "
- + ConfVars.HIVE_AUTHORIZATION_ENABLED.varname + "=false to disable authz within cli");
- }
- }
-
- /**
* (Re-)initialize currentRoleNames if necessary.
* @throws HiveAuthzPluginException
*/
@@ -381,9 +351,9 @@ public class SQLStdHiveAccessController
@Override
public List<HiveRoleGrant> getPrincipalGrantInfoForRole(String roleName) throws HiveAuthzPluginException, HiveAccessControlException {
// only user belonging to admin role can list role
- if (!isUserAdmin()) {
+ if (!isUserAdmin() && !doesUserHasAdminOption(Arrays.asList(roleName))) {
throw new HiveAccessControlException("Current user : " + currentUserName+ " is not"
- + " allowed get principals in a role. " + ADMIN_ONLY_MSG);
+ + " allowed get principals in a role. " + ADMIN_ONLY_MSG + " Otherwise, " + HAS_ADMIN_PRIV_MSG);
}
try {
return getHiveRoleGrants(metastoreClientFactory.getHiveMetastoreClient(), roleName);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidator.java Tue Oct 14 19:06:45 2014
@@ -25,12 +25,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizationValidator;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext.CLIENT_TYPE;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactory;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrincipal;
@@ -44,16 +47,30 @@ public class SQLStdHiveAuthorizationVali
private final HiveConf conf;
private final HiveAuthenticationProvider authenticator;
private final SQLStdHiveAccessControllerWrapper privController;
+ private final HiveAuthzSessionContext ctx;
public static final Log LOG = LogFactory.getLog(SQLStdHiveAuthorizationValidator.class);
public SQLStdHiveAuthorizationValidator(HiveMetastoreClientFactory metastoreClientFactory,
HiveConf conf, HiveAuthenticationProvider authenticator,
- SQLStdHiveAccessControllerWrapper privilegeManager) {
+ SQLStdHiveAccessControllerWrapper privilegeManager, HiveAuthzSessionContext ctx)
+ throws HiveAuthzPluginException {
this.metastoreClientFactory = metastoreClientFactory;
this.conf = conf;
this.authenticator = authenticator;
this.privController = privilegeManager;
+ this.ctx = SQLAuthorizationUtils.applyTestSettings(ctx, conf);
+ assertHiveCliAuthDisabled(conf);
+ }
+
+ private void assertHiveCliAuthDisabled(HiveConf conf) throws HiveAuthzPluginException {
+ if (ctx.getClientType() == CLIENT_TYPE.HIVECLI
+ && conf.getBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
+ throw new HiveAuthzPluginException(
+ "SQL standards based authorization should not be enabled from hive cli"
+ + "Instead the use of storage based authorization in hive metastore is reccomended. Set "
+ + ConfVars.HIVE_AUTHORIZATION_ENABLED.varname + "=false to disable authz within cli");
+ }
}
@Override
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizerFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizerFactory.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizerFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizerFactory.java Tue Oct 14 19:06:45 2014
@@ -37,7 +37,7 @@ public class SQLStdHiveAuthorizerFactory
return new HiveAuthorizerImpl(
privilegeManager,
new SQLStdHiveAuthorizationValidator(metastoreClientFactory, conf, authenticator,
- privilegeManager)
+ privilegeManager, ctx)
);
}
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Tue Oct 14 19:06:45 2014
@@ -515,16 +515,17 @@ public class SessionState {
*/
private Path createRootHDFSDir(HiveConf conf) throws IOException {
Path rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR));
- FsPermission expectedHDFSDirPermission = new FsPermission("777");
+ FsPermission writableHDFSDirPermission = new FsPermission((short)00733);
FileSystem fs = rootHDFSDirPath.getFileSystem(conf);
if (!fs.exists(rootHDFSDirPath)) {
- Utilities.createDirsWithPermission(conf, rootHDFSDirPath, expectedHDFSDirPermission, true);
+ Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true);
}
FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission();
LOG.debug("HDFS root scratch dir: " + rootHDFSDirPath + ", permission: "
+ currentHDFSDirPermission);
- // If the root HDFS scratch dir already exists, make sure the permissions are 777.
- if (!expectedHDFSDirPermission.equals(fs.getFileStatus(rootHDFSDirPath).getPermission())) {
+ // If the root HDFS scratch dir already exists, make sure it is writeable.
+ if (!((currentHDFSDirPermission.toShort() & writableHDFSDirPermission
+ .toShort()) == writableHDFSDirPermission.toShort())) {
throw new RuntimeException("The root scratch dir: " + rootHDFSDirPath
+ " on HDFS should be writable. Current permissions are: " + currentHDFSDirPermission);
}
@@ -1244,7 +1245,7 @@ public class SessionState {
try {
if (tezSessionState != null) {
- TezSessionPoolManager.getInstance().close(tezSessionState);
+ TezSessionPoolManager.getInstance().close(tezSessionState, false);
}
} catch (Exception e) {
LOG.info("Error closing tez session", e);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java Tue Oct 14 19:06:45 2014
@@ -18,10 +18,18 @@
package org.apache.hadoop.hive.ql.stats;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -79,20 +87,16 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
public class StatsUtils {
private static final Log LOG = LogFactory.getLog(StatsUtils.class.getName());
+
/**
* Collect table, partition and column level statistics
* @param conf
@@ -109,15 +113,34 @@ public class StatsUtils {
public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList,
Table table, TableScanOperator tableScanOperator) throws HiveException {
- Statistics stats = new Statistics();
-
// column level statistics are required only for the columns that are needed
List<ColumnInfo> schema = tableScanOperator.getSchema().getSignature();
List<String> neededColumns = tableScanOperator.getNeededColumns();
+ List<String> referencedColumns = tableScanOperator.getReferencedColumns();
+
+ return collectStatistics(conf, partList, table, schema, neededColumns, referencedColumns);
+ }
+
+ private static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList,
+ Table table, List<ColumnInfo> schema, List<String> neededColumns,
+ List<String> referencedColumns) throws HiveException {
+
boolean fetchColStats =
HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS);
boolean fetchPartStats =
HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_FETCH_PARTITION_STATS);
+
+ return collectStatistics(conf, partList, table, schema, neededColumns, referencedColumns,
+ fetchColStats, fetchPartStats);
+ }
+
+ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList,
+ Table table, List<ColumnInfo> schema, List<String> neededColumns,
+ List<String> referencedColumns, boolean fetchColStats, boolean fetchPartStats)
+ throws HiveException {
+
+ Statistics stats = new Statistics();
+
float deserFactor =
HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_STATS_DESERIALIZATION_FACTOR);
@@ -207,7 +230,6 @@ public class StatsUtils {
stats.getBasicStatsState().equals(State.COMPLETE)) {
stats.setBasicStatsState(State.PARTIAL);
}
- boolean haveFullStats = fetchColStats;
if (fetchColStats) {
List<String> partNames = new ArrayList<String>(partList.getNotDeniedPartns().size());
for (Partition part : partList.getNotDeniedPartns()) {
@@ -215,37 +237,84 @@ public class StatsUtils {
}
Map<String, String> colToTabAlias = new HashMap<String, String>();
neededColumns = processNeededColumns(schema, neededColumns, colToTabAlias);
- AggrStats aggrStats = Hive.get().getAggrColStatsFor(table.getDbName(), table.getTableName(), neededColumns, partNames);
+ AggrStats aggrStats = Hive.get().getAggrColStatsFor(table.getDbName(), table.getTableName(),
+ neededColumns, partNames);
if (null == aggrStats) {
- haveFullStats = false;
+ // There are some partitions with no state (or we didn't fetch any state).
+ // Update the stats with empty list to reflect that in the
+ // state/initialize structures.
+ List<ColStatistics> emptyStats = Lists.newArrayList();
+
+ // add partition column stats
+ addParitionColumnStats(neededColumns, referencedColumns, schema, table, partList,
+ emptyStats);
+
+ stats.addToColumnStats(emptyStats);
+ stats.updateColumnStatsState(deriveStatType(emptyStats, referencedColumns));
} else {
List<ColumnStatisticsObj> colStats = aggrStats.getColStats();
if (colStats.size() != neededColumns.size()) {
- LOG.debug("Column stats requested for : " + neededColumns.size() + " columns. Able to retrieve"
- + " for " + colStats.size() + " columns");
+ LOG.debug("Column stats requested for : " + neededColumns.size() + " columns. Able to" +
+ " retrieve for " + colStats.size() + " columns");
}
- List<ColStatistics> columnStats = convertColStats(colStats, table.getTableName(), colToTabAlias);
+ List<ColStatistics> columnStats = convertColStats(colStats, table.getTableName(),
+ colToTabAlias);
+
+ addParitionColumnStats(neededColumns, referencedColumns, schema, table, partList,
+ columnStats);
+
stats.addToColumnStats(columnStats);
- State colState = deriveStatType(columnStats, neededColumns);
+ State colState = deriveStatType(columnStats, referencedColumns);
if (aggrStats.getPartsFound() != partNames.size() && colState != State.NONE) {
- LOG.debug("Column stats requested for : " + partNames.size() +" partitions. "
- + "Able to retrieve for " + aggrStats.getPartsFound() + " partitions");
+ LOG.debug("Column stats requested for : " + partNames.size() + " partitions. "
+ + "Able to retrieve for " + aggrStats.getPartsFound() + " partitions");
colState = State.PARTIAL;
}
stats.setColumnStatsState(colState);
}
}
- // There are some partitions with no state (or we didn't fetch any state).
- // Update the stats with empty list to reflect that in the state/initialize structures.
- if (!haveFullStats) {
- List<ColStatistics> emptyStats = Lists.<ColStatistics>newArrayList();
- stats.addToColumnStats(emptyStats);
- stats.updateColumnStatsState(deriveStatType(emptyStats, neededColumns));
- }
}
return stats;
}
+ private static void addParitionColumnStats(List<String> neededColumns,
+ List<String> referencedColumns, List<ColumnInfo> schema, Table table,
+ PrunedPartitionList partList, List<ColStatistics> colStats)
+ throws HiveException {
+
+ // extra columns is difference between referenced columns vs needed
+ // columns. The difference could be partition columns.
+ List<String> extraCols = Lists.newArrayList(referencedColumns);
+ if (referencedColumns.size() > neededColumns.size()) {
+ extraCols.removeAll(neededColumns);
+ for (String col : extraCols) {
+ for (ColumnInfo ci : schema) {
+ // conditions for being partition column
+ if (col.equals(ci.getInternalName()) && ci.getIsVirtualCol() &&
+ !ci.isHiddenVirtualCol()) {
+ // currently metastore does not store column stats for
+ // partition column, so we calculate the NDV from pruned
+ // partition list
+ ColStatistics partCS = new ColStatistics(table.getTableName(),
+ ci.getInternalName(), ci.getType().getTypeName());
+ long numPartitions = getNDVPartitionColumn(partList.getPartitions(),
+ ci.getInternalName());
+ partCS.setCountDistint(numPartitions);
+ colStats.add(partCS);
+ }
+ }
+ }
+ }
+ }
+
+ public static int getNDVPartitionColumn(Set<Partition> partitions, String partColName) {
+ Set<String> distinctVals = new HashSet<String>(partitions.size());
+ for (Partition partition : partitions) {
+ distinctVals.add(partition.getSpec().get(partColName));
+ }
+ return distinctVals.size();
+ }
+
private static void setUnknownRcDsToAverage(
List<Long> rowCounts, List<Long> dataSizes, int avgRowSize) {
if (LOG.isDebugEnabled()) {
@@ -751,7 +820,8 @@ public class StatsUtils {
|| colType.equalsIgnoreCase(serdeConstants.FLOAT_TYPE_NAME)) {
return JavaDataModel.get().primitive1();
} else if (colType.equalsIgnoreCase(serdeConstants.DOUBLE_TYPE_NAME)
- || colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)) {
+ || colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)
+ || colType.equalsIgnoreCase("long")) {
return JavaDataModel.get().primitive2();
} else if (colType.equalsIgnoreCase(serdeConstants.TIMESTAMP_TYPE_NAME)) {
return JavaDataModel.get().lengthOfTimestamp();
@@ -780,7 +850,8 @@ public class StatsUtils {
return JavaDataModel.get().lengthForIntArrayOfSize(length);
} else if (colType.equalsIgnoreCase(serdeConstants.DOUBLE_TYPE_NAME)) {
return JavaDataModel.get().lengthForDoubleArrayOfSize(length);
- } else if (colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)) {
+ } else if (colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)
+ || colType.equalsIgnoreCase("long")) {
return JavaDataModel.get().lengthForLongArrayOfSize(length);
} else if (colType.equalsIgnoreCase(serdeConstants.BINARY_TYPE_NAME)) {
return JavaDataModel.get().lengthForByteArrayOfSize(length);
@@ -876,7 +947,7 @@ public class StatsUtils {
Statistics parentStats, Map<String, ExprNodeDesc> colExprMap, RowSchema rowSchema) {
List<ColStatistics> cs = Lists.newArrayList();
- if (colExprMap != null) {
+ if (colExprMap != null && rowSchema != null) {
for (ColumnInfo ci : rowSchema.getSignature()) {
String outColName = ci.getInternalName();
outColName = StatsUtils.stripPrefixFromColumnName(outColName);
@@ -1042,10 +1113,8 @@ public class StatsUtils {
/**
* Get basic stats of table
- * @param dbName
- * - database name
- * @param tabName
- * - table name
+ * @param table
+ * - table
* @param statType
* - type of stats
* @return value of stats
@@ -1283,4 +1352,11 @@ public class StatsUtils {
}
}
+
+ public static long getAvailableMemory(Configuration conf) {
+ int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ?
+ HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
+ conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
+ return memory;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java Tue Oct 14 19:06:45 2014
@@ -24,15 +24,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnListImpl;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockLevel;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -41,7 +38,12 @@ import org.apache.hadoop.util.StringUtil
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* A class to clean directories after compactions. This will run in a separate thread.
@@ -50,35 +52,85 @@ public class Cleaner extends CompactorTh
static final private String CLASS_NAME = Cleaner.class.getName();
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
- private long cleanerCheckInterval = 5000;
+ private long cleanerCheckInterval = 0;
+
+ // List of compactions to clean.
+ private Map<Long, Set<Long>> compactId2LockMap = new HashMap<Long, Set<Long>>();
+ private Map<Long, CompactionInfo> compactId2CompactInfoMap = new HashMap<Long, CompactionInfo>();
@Override
public void run() {
- // Make sure nothing escapes this run method and kills the metastore at large,
- // so wrap it in a big catch Throwable statement.
+ if (cleanerCheckInterval == 0) {
+ cleanerCheckInterval = conf.getTimeVar(
+ HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
do {
+ // This is solely for testing. It checks if the test has set the looped value to false,
+ // and if so remembers that and then sets it to true at the end. We have to check here
+ // first to make sure we go through a complete iteration of the loop before resetting it.
+ boolean setLooped = !looped.boolVal;
+ // Make sure nothing escapes this run method and kills the metastore at large,
+ // so wrap it in a big catch Throwable statement.
try {
long startedAt = System.currentTimeMillis();
- // Now look for new entries ready to be cleaned.
+ // First look for all the compactions that are waiting to be cleaned. If we have not
+ // seen an entry before, look for all the locks held on that table or partition and
+ // record them. We will then only clean the partition once all of those locks have been
+ // released. This way we avoid removing the files while they are in use,
+ // while at the same time avoiding starving the cleaner as new readers come along.
+ // This works because we know that any reader who comes along after the worker thread has
+ // done the compaction will read the more up to date version of the data (either in a
+ // newer delta or in a newer base).
List<CompactionInfo> toClean = txnHandler.findReadyToClean();
- for (CompactionInfo ci : toClean) {
- LockComponent comp = null;
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, ci.dbname);
- comp.setTablename(ci.tableName);
- if (ci.partName != null) comp.setPartitionname(ci.partName);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest rqst = new LockRequest(components, System.getProperty("user.name"),
- Worker.hostname());
- LockResponse rsp = txnHandler.lockNoWait(rqst);
+ if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
+ ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());
+
+ for (CompactionInfo ci : toClean) {
+ // Check to see if we have seen this request before. If so, ignore it. If not,
+ // add it to our queue.
+ if (!compactId2LockMap.containsKey(ci.id)) {
+ compactId2LockMap.put(ci.id, findRelatedLocks(ci, locksResponse));
+ compactId2CompactInfoMap.put(ci.id, ci);
+ }
+ }
+
+ // Now, for each entry in the queue, see if all of the associated locks are clear so we
+ // can clean
+ Set<Long> currentLocks = buildCurrentLockSet(locksResponse);
+ List<Long> expiredLocks = new ArrayList<Long>();
+ List<Long> compactionsCleaned = new ArrayList<Long>();
try {
- if (rsp.getState() == LockState.ACQUIRED) {
- clean(ci);
+ for (Map.Entry<Long, Set<Long>> queueEntry : compactId2LockMap.entrySet()) {
+ boolean sawLock = false;
+ for (Long lockId : queueEntry.getValue()) {
+ if (currentLocks.contains(lockId)) {
+ sawLock = true;
+ break;
+ } else {
+ expiredLocks.add(lockId);
+ }
+ }
+
+ if (!sawLock) {
+ // Remember to remove this when we're out of the loop,
+ // we can't do it in the loop or we'll get a concurrent modification exception.
+ compactionsCleaned.add(queueEntry.getKey());
+ clean(compactId2CompactInfoMap.get(queueEntry.getKey()));
+ } else {
+ // Remove the locks we didn't see so we don't look for them again next time
+ for (Long lockId : expiredLocks) {
+ queueEntry.getValue().remove(lockId);
+ }
+ }
}
} finally {
- if (rsp.getState() == LockState.ACQUIRED) {
- txnHandler.unlock(new UnlockRequest(rsp.getLockid()));
+ if (compactionsCleaned.size() > 0) {
+ for (Long compactId : compactionsCleaned) {
+ compactId2LockMap.remove(compactId);
+ compactId2CompactInfoMap.remove(compactId);
+ }
}
}
}
@@ -91,9 +143,37 @@ public class Cleaner extends CompactorTh
LOG.error("Caught an exception in the main loop of compactor cleaner, " +
StringUtils.stringifyException(t));
}
+ if (setLooped) {
+ looped.boolVal = true;
+ }
} while (!stop.boolVal);
}
+ private Set<Long> findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) {
+ Set<Long> relatedLocks = new HashSet<Long>();
+ for (ShowLocksResponseElement lock : locksResponse.getLocks()) {
+ if (ci.dbname.equals(lock.getDbname())) {
+ if ((ci.tableName == null && lock.getTablename() == null) ||
+ (ci.tableName != null && ci.tableName.equals(lock.getTablename()))) {
+ if ((ci.partName == null && lock.getPartname() == null) ||
+ (ci.partName != null && ci.partName.equals(lock.getPartname()))) {
+ relatedLocks.add(lock.getLockid());
+ }
+ }
+ }
+ }
+
+ return relatedLocks;
+ }
+
+ private Set<Long> buildCurrentLockSet(ShowLocksResponse locksResponse) {
+ Set<Long> currentLocks = new HashSet<Long>(locksResponse.getLocks().size());
+ for (ShowLocksResponseElement lock : locksResponse.getLocks()) {
+ currentLocks.add(lock.getLockid());
+ }
+ return currentLocks;
+ }
+
private void clean(CompactionInfo ci) throws MetaException {
LOG.info("Starting cleaning for " + ci.getFullPartitionName());
try {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java Tue Oct 14 19:06:45 2014
@@ -506,13 +506,15 @@ public class CompactorMR {
ValidTxnList txnList =
new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
+ boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
AcidInputFormat.RawReader<V> reader =
- aif.getRawReader(jobConf, jobConf.getBoolean(IS_MAJOR, false), split.getBucket(),
+ aif.getRawReader(jobConf, isMajor, split.getBucket(),
txnList, split.getBaseDir(), split.getDeltaDirs());
RecordIdentifier identifier = reader.createKey();
V value = reader.createValue();
getWriter(reporter, reader.getObjectInspector(), split.getBucket());
while (reader.next(identifier, value)) {
+ if (isMajor && reader.isDelete(value)) continue;
writer.write(value);
reporter.progress();
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java Tue Oct 14 19:06:45 2014
@@ -53,6 +53,7 @@ abstract class CompactorThread extends T
protected RawStore rs;
protected int threadId;
protected BooleanPointer stop;
+ protected BooleanPointer looped;
@Override
public void setHiveConf(HiveConf conf) {
@@ -66,8 +67,9 @@ abstract class CompactorThread extends T
}
@Override
- public void init(BooleanPointer stop) throws MetaException {
+ public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException {
this.stop = stop;
+ this.looped = looped;
setPriority(MIN_PRIORITY);
setDaemon(true); // this means the process will exit without waiting for this thread
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Tue Oct 14 19:06:45 2014
@@ -76,7 +76,7 @@ public class Initiator extends Compactor
// don't doom the entire thread.
try {
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
- ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns());
+ ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0);
Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
@@ -137,8 +137,8 @@ public class Initiator extends Compactor
}
@Override
- public void init(BooleanPointer stop) throws MetaException {
- super.init(stop);
+ public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException {
+ super.init(stop, looped);
checkInterval =
conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ;
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java Tue Oct 14 19:06:45 2014
@@ -120,7 +120,7 @@ public class Worker extends CompactorThr
final boolean isMajor = ci.isMajorCompaction();
final ValidTxnList txns =
- TxnHandler.createValidTxnList(txnHandler.getOpenTxns());
+ TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0);
final StringBuffer jobName = new StringBuffer(name);
jobName.append("-compactor-");
jobName.append(ci.getFullPartitionName());
@@ -168,8 +168,8 @@ public class Worker extends CompactorThr
}
@Override
- public void init(BooleanPointer stop) throws MetaException {
- super.init(stop);
+ public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException {
+ super.init(stop, looped);
StringBuilder name = new StringBuilder(hostname());
name.append("-");
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFLog.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFLog.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFLog.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFLog.java Tue Oct 14 19:06:45 2014
@@ -52,29 +52,6 @@ public class UDFLog extends UDFMath {
}
/**
- * Get the logarithm of the given decimal with the given base.
- */
- public DoubleWritable evaluate(DoubleWritable base, HiveDecimalWritable writable) {
- if (base == null || writable == null) {
- return null;
- }
- double d = writable.getHiveDecimal().bigDecimalValue().doubleValue();
- return log(base.get(), d);
- }
-
- /**
- * Get the logarithm of input with the given decimal as the base.
- */
- public DoubleWritable evaluate(HiveDecimalWritable base, DoubleWritable d) {
- if (base == null || d == null) {
- return null;
- }
-
- double b = base.getHiveDecimal().bigDecimalValue().doubleValue();
- return log(b, d.get());
- }
-
- /**
* Get the logarithm of the given decimal input with the given decimal base.
*/
public DoubleWritable evaluate(HiveDecimalWritable baseWritable, HiveDecimalWritable writable) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseNumeric.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseNumeric.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseNumeric.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseNumeric.java Tue Oct 14 19:06:45 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.No
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveGrouping;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -318,4 +320,17 @@ public abstract class GenericUDFBaseNume
public void setAnsiSqlArithmetic(boolean ansiSqlArithmetic) {
this.ansiSqlArithmetic = ansiSqlArithmetic;
}
+
+ public PrimitiveTypeInfo deriveMinArgumentCast(
+ ExprNodeDesc childExpr, TypeInfo targetType) {
+ assert targetType instanceof PrimitiveTypeInfo : "Not a primitive type" + targetType;
+ PrimitiveTypeInfo pti = (PrimitiveTypeInfo)targetType;
+ // We only do the minimum cast for decimals. Other types are assumed safe; fix if needed.
+ // We also don't do anything for non-primitive children (maybe we should assert).
+ if ((pti.getPrimitiveCategory() != PrimitiveCategory.DECIMAL)
+ || (!(childExpr.getTypeInfo() instanceof PrimitiveTypeInfo))) return pti;
+ PrimitiveTypeInfo childTi = (PrimitiveTypeInfo)childExpr.getTypeInfo();
+ // If the child is also decimal, no cast is needed (we hope - can target type be narrower?).
+ return HiveDecimalUtils.getDecimalTypeForPrimitiveCategory(childTi);
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFromUtcTimestamp.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFromUtcTimestamp.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFromUtcTimestamp.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFromUtcTimestamp.java Tue Oct 14 19:06:45 2014
@@ -22,6 +22,7 @@ import java.util.TimeZone;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -33,7 +34,9 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
-
+@Description(name = "from_utc_timestamp",
+ value = "from_utc_timestamp(timestamp, string timezone) - "
+ + "Assumes given timestamp ist UTC and converts to given timezone (as of Hive 0.8.0)")
public class GenericUDFFromUtcTimestamp extends GenericUDF {
static final Log LOG = LogFactory.getLog(GenericUDFFromUtcTimestamp.class);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIf.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIf.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIf.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIf.java Tue Oct 14 19:06:45 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.udf.generic;
+import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
@@ -62,6 +63,11 @@ import org.apache.hadoop.hive.ql.exec.ve
* otherwise it returns expr3. IF() returns a numeric or string value, depending
* on the context in which it is used.
*/
+@Description(
+ name = "if",
+ value = "IF(expr1,expr2,expr3) - If expr1 is TRUE (expr1 <> 0 and expr1 <> NULL) then"
+ + " IF() returns expr2; otherwise it returns expr3. IF() returns a numeric or string value,"
+ + " depending on the context in which it is used.")
@VectorizedExpressions({
IfExprLongColumnLongColumn.class, IfExprDoubleColumnDoubleColumn.class,
IfExprLongColumnLongScalar.class, IfExprDoubleColumnDoubleScalar.class,
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java Tue Oct 14 19:06:45 2014
@@ -26,9 +26,11 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
/**
* GenericUDFIndex.
@@ -36,11 +38,10 @@ import org.apache.hadoop.hive.serde2.obj
*/
@Description(name = "index", value = "_FUNC_(a, n) - Returns the n-th element of a ")
public class GenericUDFIndex extends GenericUDF {
+
private transient MapObjectInspector mapOI;
- private boolean mapKeyPreferWritable;
private transient ListObjectInspector listOI;
- private transient PrimitiveObjectInspector indexOI;
- private transient ObjectInspector returnOI;
+ private transient ObjectInspectorConverters.Converter converter;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
@@ -66,21 +67,22 @@ public class GenericUDFIndex extends Gen
}
// index has to be a primitive
- if (arguments[1] instanceof PrimitiveObjectInspector) {
- indexOI = (PrimitiveObjectInspector) arguments[1];
- } else {
+ if (!(arguments[1] instanceof PrimitiveObjectInspector)) {
throw new UDFArgumentTypeException(1, "Primitive Type is expected but "
+ arguments[1].getTypeName() + "\" is found");
}
-
+ PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) arguments[1];
+ ObjectInspector returnOI;
+ ObjectInspector indexOI;
if (mapOI != null) {
+ indexOI = ObjectInspectorConverters.getConvertedOI(
+ inputOI, mapOI.getMapKeyObjectInspector());
returnOI = mapOI.getMapValueObjectInspector();
- ObjectInspector keyOI = mapOI.getMapKeyObjectInspector();
- mapKeyPreferWritable = ((PrimitiveObjectInspector) keyOI)
- .preferWritable();
} else {
+ indexOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
returnOI = listOI.getListElementObjectInspector();
}
+ converter = ObjectInspectorConverters.getConverter(inputOI, indexOI);
return returnOI;
}
@@ -88,35 +90,16 @@ public class GenericUDFIndex extends Gen
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
assert (arguments.length == 2);
- Object main = arguments[0].get();
Object index = arguments[1].get();
+ Object indexObject = converter.convert(index);
+ if (indexObject == null) {
+ return null;
+ }
if (mapOI != null) {
-
- Object indexObject;
- if (mapKeyPreferWritable) {
- indexObject = indexOI.getPrimitiveWritableObject(index);
- } else {
- indexObject = indexOI.getPrimitiveJavaObject(index);
- }
- return mapOI.getMapValueElement(main, indexObject);
-
- } else {
-
- assert (listOI != null);
- int intIndex = 0;
- try {
- intIndex = PrimitiveObjectInspectorUtils.getInt(index, indexOI);
- } catch (NullPointerException e) {
- // If index is null, we should return null.
- return null;
- } catch (NumberFormatException e) {
- // If index is not a number, we should return null.
- return null;
- }
- return listOI.getListElement(main, intIndex);
-
+ return mapOI.getMapValueElement(arguments[0].get(), indexObject);
}
+ return listOI.getListElement(arguments[0].get(), ((IntWritable)indexObject).get());
}
@Override
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFTimestamp.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFTimestamp.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFTimestamp.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFTimestamp.java Tue Oct 14 19:06:45 2014
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.udf.generic;
+import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedExpressions;
@@ -39,6 +40,8 @@ import org.apache.hadoop.hive.serde2.obj
* Creates a TimestampWritable object using PrimitiveObjectInspectorConverter
*
*/
+@Description(name = "timestamp",
+value = "cast(date as timestamp) - Returns timestamp")
@VectorizedExpressions({CastLongToTimestampViaLongToLong.class,
CastDoubleToTimestampViaDoubleToLong.class, CastDecimalToTimestamp.class})
public class GenericUDFTimestamp extends GenericUDF {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToUtcTimestamp.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToUtcTimestamp.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToUtcTimestamp.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToUtcTimestamp.java Tue Oct 14 19:06:45 2014
@@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hive.ql.udf.generic;
+import org.apache.hadoop.hive.ql.exec.Description;
+@Description(name = "to_utc_timestamp",
+ value = "to_utc_timestamp(timestamp, string timezone) - "
+ + "Assumes given timestamp is in given timezone and converts to UTC (as of Hive 0.8.0)")
public class GenericUDFToUtcTimestamp extends
GenericUDFFromUtcTimestamp {
Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFunctionRegistry.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFunctionRegistry.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFunctionRegistry.java Tue Oct 14 19:06:45 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@@ -80,7 +79,7 @@ public class TestFunctionRegistry extend
}
private void implicit(TypeInfo a, TypeInfo b, boolean convertible) {
- assertEquals(convertible, FunctionRegistry.implicitConvertable(a,b));
+ assertEquals(convertible, FunctionRegistry.implicitConvertible(a, b));
}
public void testImplicitConversion() {
Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java Tue Oct 14 19:06:45 2014
@@ -18,17 +18,24 @@
package org.apache.hadoop.hive.ql.exec;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
@@ -42,6 +49,10 @@ import org.apache.hadoop.hive.ql.plan.Pl
import org.apache.hadoop.hive.ql.plan.ScriptDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.processors.CommandProcessor;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -49,8 +60,14 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.junit.Test;
/**
* TestOperators.
@@ -274,7 +291,7 @@ public class TestOperators extends TestC
cd, sop);
op.initialize(new JobConf(TestOperators.class),
- new ObjectInspector[] {r[0].oi});
+ new ObjectInspector[]{r[0].oi});
// evaluate on row
for (int i = 0; i < 5; i++) {
@@ -314,7 +331,8 @@ public class TestOperators extends TestC
Configuration hconf = new JobConf(TestOperators.class);
HiveConf.setVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME,
"hdfs:///testDir/testFile");
- IOContext.get().setInputPath(new Path("hdfs:///testDir/testFile"));
+ IOContext.get(hconf.get(Utilities.INPUT_NAME)).setInputPath(
+ new Path("hdfs:///testDir/testFile"));
// initialize pathToAliases
ArrayList<String> aliases = new ArrayList<String>();
@@ -379,4 +397,82 @@ public class TestOperators extends TestC
throw (e);
}
}
+
+ @Test
+ public void testFetchOperatorContextQuoting() throws Exception {
+ JobConf conf = new JobConf();
+ ArrayList<Path> list = new ArrayList<Path>();
+ list.add(new Path("hdfs://nn.example.com/fi\tl\\e\t1"));
+ list.add(new Path("hdfs://nn.example.com/file\t2"));
+ list.add(new Path("file:/file3"));
+ FetchOperator.setFetchOperatorContext(conf, list);
+ String[] parts =
+ conf.get(FetchOperator.FETCH_OPERATOR_DIRECTORY_LIST).split("\t");
+ assertEquals(3, parts.length);
+ assertEquals("hdfs://nn.example.com/fi\\tl\\\\e\\t1", parts[0]);
+ assertEquals("hdfs://nn.example.com/file\\t2", parts[1]);
+ assertEquals("file:/file3", parts[2]);
+ }
+
+ /**
+ * A custom input format that checks to make sure that the fetch operator
+ * sets the required attributes.
+ */
+ public static class CustomInFmt extends TextInputFormat {
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int splits) throws IOException {
+
+ // ensure that the table properties were copied
+ assertEquals("val1", job.get("myprop1"));
+ assertEquals("val2", job.get("myprop2"));
+
+ // ensure that both of the partitions are in the complete list.
+ String[] dirs = job.get("hive.complete.dir.list").split("\t");
+ assertEquals(2, dirs.length);
+ assertEquals(true, dirs[0].endsWith("/state=CA"));
+ assertEquals(true, dirs[1].endsWith("/state=OR"));
+ return super.getSplits(job, splits);
+ }
+ }
+
+ @Test
+ public void testFetchOperatorContext() throws Exception {
+ HiveConf conf = new HiveConf();
+ conf.set("hive.support.concurrency", "false");
+ SessionState.start(conf);
+ String cmd = "create table fetchOp (id int, name string) " +
+ "partitioned by (state string) " +
+ "row format delimited fields terminated by '|' " +
+ "stored as " +
+ "inputformat 'org.apache.hadoop.hive.ql.exec.TestOperators$CustomInFmt' " +
+ "outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' " +
+ "tblproperties ('myprop1'='val1', 'myprop2' = 'val2')";
+ Driver driver = new Driver();
+ driver.init();
+ CommandProcessorResponse response = driver.run(cmd);
+ assertEquals(0, response.getResponseCode());
+ List<Object> result = new ArrayList<Object>();
+
+ cmd = "load data local inpath '../data/files/employee.dat' " +
+ "overwrite into table fetchOp partition (state='CA')";
+ driver.init();
+ response = driver.run(cmd);
+ assertEquals(0, response.getResponseCode());
+
+ cmd = "load data local inpath '../data/files/employee2.dat' " +
+ "overwrite into table fetchOp partition (state='OR')";
+ driver.init();
+ response = driver.run(cmd);
+ assertEquals(0, response.getResponseCode());
+
+ cmd = "select * from fetchOp";
+ driver.init();
+ driver.setMaxRows(500);
+ response = driver.run(cmd);
+ assertEquals(0, response.getResponseCode());
+ driver.getResults(result);
+ assertEquals(20, result.size());
+ driver.close();
+ }
}
Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java Tue Oct 14 19:06:45 2014
@@ -26,6 +26,7 @@ import java.util.Random;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.apache.hadoop.hive.conf.HiveConf;
public class TestTezSessionPool {
@@ -157,4 +158,29 @@ public class TestTezSessionPool {
}
}
}
+
+ @Test
+ public void testCloseAndOpenDefault() throws Exception {
+ poolManager = new TestTezSessionPoolManager();
+ TezSessionState session = Mockito.mock(TezSessionState.class);
+ Mockito.when(session.isDefault()).thenReturn(false);
+
+ poolManager.closeAndOpen(session, conf, false);
+
+ Mockito.verify(session).close(false);
+ Mockito.verify(session).open(conf, null);
+ }
+
+ @Test
+ public void testCloseAndOpenWithResources() throws Exception {
+ poolManager = new TestTezSessionPoolManager();
+ TezSessionState session = Mockito.mock(TezSessionState.class);
+ Mockito.when(session.isDefault()).thenReturn(false);
+ String[] extraResources = new String[] { "file:///tmp/foo.jar" };
+
+ poolManager.closeAndOpen(session, conf, extraResources, false);
+
+ Mockito.verify(session).close(false);
+ Mockito.verify(session).open(conf, extraResources);
+ }
}
Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Tue Oct 14 19:06:45 2014
@@ -30,9 +30,11 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.Re
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.mapred.JobConf;
@@ -90,8 +93,11 @@ public class TestTezTask {
path = mock(Path.class);
when(path.getFileSystem(any(Configuration.class))).thenReturn(fs);
when(utils.getTezDir(any(Path.class))).thenReturn(path);
- when(utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class), any(LocalResource.class),
- any(List.class), any(FileSystem.class), any(Context.class), anyBoolean(), any(TezWork.class))).thenAnswer(new Answer<Vertex>() {
+ when(
+ utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class),
+ any(LocalResource.class), any(List.class), any(FileSystem.class), any(Context.class),
+ anyBoolean(), any(TezWork.class), any(VertexType.class))).thenAnswer(
+ new Answer<Vertex>() {
@Override
public Vertex answer(InvocationOnMock invocation) throws Throwable {
@@ -101,8 +107,8 @@ public class TestTezTask {
}
});
- when(utils.createEdge(any(JobConf.class), any(Vertex.class),
- any(Vertex.class), any(TezEdgeProperty.class))).thenAnswer(new Answer<Edge>() {
+ when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(Vertex.class),
+ any(TezEdgeProperty.class), any(VertexType.class))).thenAnswer(new Answer<Edge>() {
@Override
public Edge answer(InvocationOnMock invocation) throws Throwable {
@@ -204,10 +210,11 @@ public class TestTezTask {
@Test
public void testSubmit() throws Exception {
DAG dag = DAG.create("test");
- task.submit(conf, dag, path, appLr, sessionState, new LinkedList());
+ task.submit(conf, dag, path, appLr, sessionState, Collections.<LocalResource> emptyList(),
+ new String[0], Collections.<String,LocalResource> emptyMap());
// validate close/reopen
- verify(sessionState, times(1)).open(any(HiveConf.class));
- verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043
+ verify(sessionState, times(1)).open(any(HiveConf.class), any(String[].class));
+ verify(sessionState, times(1)).close(eq(true)); // now uses pool after HIVE-7043
verify(session, times(2)).submitDAG(any(DAG.class));
}
@@ -216,4 +223,54 @@ public class TestTezTask {
task.close(work, 0);
verify(op, times(4)).jobClose(any(Configuration.class), eq(true));
}
+
+ @Test
+ public void testExistingSessionGetsStorageHandlerResources() throws Exception {
+ final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+ LocalResource res = mock(LocalResource.class);
+ final List<LocalResource> resources = Collections.singletonList(res);
+ final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
+ resMap.put("foo.jar", res);
+
+ when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+ .thenReturn(resources);
+ when(utils.getBaseName(res)).thenReturn("foo.jar");
+ when(sessionState.isOpen()).thenReturn(true);
+ when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
+ task.updateSession(sessionState, conf, path, inputOutputJars, resMap);
+ verify(session).addAppMasterLocalFiles(resMap);
+ }
+
+ @Test
+ public void testExtraResourcesAddedToDag() throws Exception {
+ final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+ LocalResource res = mock(LocalResource.class);
+ final List<LocalResource> resources = Collections.singletonList(res);
+ final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
+ resMap.put("foo.jar", res);
+ DAG dag = mock(DAG.class);
+
+ when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+ .thenReturn(resources);
+ when(utils.getBaseName(res)).thenReturn("foo.jar");
+ when(sessionState.isOpen()).thenReturn(true);
+ when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
+ task.addExtraResourcesToDag(sessionState, dag, inputOutputJars, resMap);
+ verify(dag).addTaskLocalFiles(resMap);
+ }
+
+ @Test
+ public void testGetExtraLocalResources() throws Exception {
+ final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+ LocalResource res = mock(LocalResource.class);
+ final List<LocalResource> resources = Collections.singletonList(res);
+ final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
+ resMap.put("foo.jar", res);
+
+ when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+ .thenReturn(resources);
+ when(utils.getBaseName(res)).thenReturn("foo.jar");
+
+ assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars));
+ }
}
Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java Tue Oct 14 19:06:45 2014
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.Ag
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -601,6 +602,30 @@ public class TestVectorGroupByOperator {
}
@Test
+ public void testCountReduce() throws HiveException {
+ testAggregateCountReduce(
+ 2,
+ Arrays.asList(new Long[]{}),
+ 0L);
+ testAggregateCountReduce(
+ 2,
+ Arrays.asList(new Long[]{0L}),
+ 0L);
+ testAggregateCountReduce(
+ 2,
+ Arrays.asList(new Long[]{0L,0L}),
+ 0L);
+ testAggregateCountReduce(
+ 2,
+ Arrays.asList(new Long[]{0L,1L,0L}),
+ 1L);
+ testAggregateCountReduce(
+ 2,
+ Arrays.asList(new Long[]{13L,0L,7L,19L}),
+ 39L);
+ }
+
+ @Test
public void testCountDecimal() throws HiveException {
testAggregateDecimal(
"Decimal",
@@ -1210,7 +1235,7 @@ public class TestVectorGroupByOperator {
"count",
2,
Arrays.asList(new Long[]{}),
- null);
+ 0L);
}
@Test
@@ -2027,6 +2052,17 @@ public class TestVectorGroupByOperator {
testAggregateCountStarIterable (fdr, expected);
}
+ public void testAggregateCountReduce (
+ int batchSize,
+ Iterable<Long> values,
+ Object expected) throws HiveException {
+
+ @SuppressWarnings("unchecked")
+ FakeVectorRowBatchFromLongIterables fdr = new FakeVectorRowBatchFromLongIterables(batchSize,
+ values);
+ testAggregateCountReduceIterable (fdr, expected);
+ }
+
public static interface Validator {
void validate (String key, Object expected, Object result);
@@ -2223,6 +2259,37 @@ public class TestVectorGroupByOperator {
validator.validate("_total", expected, result);
}
+ public void testAggregateCountReduceIterable (
+ Iterable<VectorizedRowBatch> data,
+ Object expected) throws HiveException {
+ Map<String, Integer> mapColumnNames = new HashMap<String, Integer>();
+ mapColumnNames.put("A", 0);
+ VectorizationContext ctx = new VectorizationContext(mapColumnNames, 1);
+
+ GroupByDesc desc = buildGroupByDescType(ctx, "count", "A", TypeInfoFactory.longTypeInfo);
+ VectorGroupByDesc vectorDesc = desc.getVectorDesc();
+ vectorDesc.setIsReduce(true);
+
+ VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc);
+
+ FakeCaptureOutputOperator out = FakeCaptureOutputOperator.addCaptureOutputChild(vgo);
+ vgo.initialize(null, null);
+
+ for (VectorizedRowBatch unit: data) {
+ vgo.processOp(unit, 0);
+ }
+ vgo.close(false);
+
+ List<Object> outBatchList = out.getCapturedRows();
+ assertNotNull(outBatchList);
+ assertEquals(1, outBatchList.size());
+
+ Object result = outBatchList.get(0);
+
+ Validator validator = getValidator("count");
+ validator.validate("_total", expected, result);
+ }
+
public void testAggregateStringIterable (
String aggregateName,
Iterable<VectorizedRowBatch> data,
Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/StorageFormats.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/StorageFormats.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/StorageFormats.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/StorageFormats.java Tue Oct 14 19:06:45 2014
@@ -90,7 +90,9 @@ public class StorageFormats {
* includes both native Hive storage formats as well as those enumerated in the
* ADDITIONAL_STORAGE_FORMATS table.
*
- * @return List of storage format as paramters.
+ * @return List of storage format as a Collection of Object arrays, each containing (in order):
+ * Storage format name, SerDe class name, InputFormat class name, OutputFormat class name.
+ * This list is used as the parameters to JUnit parameterized tests.
*/
public static Collection<Object[]> asParameters() {
List<Object[]> parameters = new ArrayList<Object[]>();
@@ -130,5 +132,21 @@ public class StorageFormats {
return parameters;
}
+
+ /**
+ * Returns a list of the names of storage formats.
+ *
+ * @return List of names of storage formats.
+ */
+ public static Collection<Object[]> names() {
+ List<Object[]> names = new ArrayList<Object[]>();
+ for (StorageFormatDescriptor descriptor : ServiceLoader.load(StorageFormatDescriptor.class)) {
+ String[] formatNames = new String[descriptor.getNames().size()];
+ formatNames = descriptor.getNames().toArray(formatNames);
+ String[] params = { formatNames[0] };
+ names.add(params);
+ }
+ return names;
+ }
}
Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java Tue Oct 14 19:06:45 2014
@@ -217,11 +217,11 @@ public class TestAcidUtils {
Path part = new MockPath(fs, "/tbl/part1");
AcidUtils.Directory dir =
AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("150:"));
+ // The two original buckets won't be in the obsolete list because we don't look at those
+ // until we have determined there is no base.
List<FileStatus> obsolete = dir.getObsolete();
- assertEquals(3, obsolete.size());
+ assertEquals(1, obsolete.size());
assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).getPath().toString());
- assertEquals("mock:/tbl/part1/000000_0", obsolete.get(1).getPath().toString());
- assertEquals("mock:/tbl/part1/000001_1", obsolete.get(2).getPath().toString());
assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString());
}
Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java Tue Oct 14 19:06:45 2014
@@ -115,7 +115,8 @@ public class TestHiveBinarySearchRecordR
}
private void resetIOContext() {
- ioContext = IOContext.get();
+ conf.set(Utilities.INPUT_NAME, "TestHiveBinarySearchRecordReader");
+ ioContext = IOContext.get(conf.get(Utilities.INPUT_NAME));
ioContext.setUseSorted(false);
ioContext.setIsBinarySearching(false);
ioContext.setEndBinarySearch(false);
@@ -124,6 +125,7 @@ public class TestHiveBinarySearchRecordR
}
private void init() throws IOException {
+ conf = new JobConf();
resetIOContext();
rcfReader = mock(RCFileRecordReader.class);
when(rcfReader.next((LongWritable)anyObject(),
@@ -131,7 +133,6 @@ public class TestHiveBinarySearchRecordR
// Since the start is 0, and the length is 100, the first call to sync should be with the value
// 50 so return that for getPos()
when(rcfReader.getPos()).thenReturn(50L);
- conf = new JobConf();
conf.setBoolean("hive.input.format.sorted", true);
TableDesc tblDesc = Utilities.defaultTd;
Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java Tue Oct 14 19:06:45 2014
@@ -165,7 +165,7 @@ public class TestSymlinkTextInputFormat
+ " failed with exit code= " + ecode);
}
- String cmd = "select key from " + tblName;
+ String cmd = "select key*1 from " + tblName;
drv.compile(cmd);
//create scratch dir
Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Tue Oct 14 19:06:45 2014
@@ -73,6 +73,7 @@ import org.apache.hadoop.hive.ql.io.sarg
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -118,7 +119,6 @@ public class TestInputOutputFormat {
TimeZone gmt = TimeZone.getTimeZone("GMT+0");
DATE_FORMAT.setTimeZone(gmt);
TIME_FORMAT.setTimeZone(gmt);
- TimeZone local = TimeZone.getDefault();
}
public static class BigRow implements Writable {
@@ -560,6 +560,12 @@ public class TestInputOutputFormat {
this.file = file;
}
+ /**
+ * Set the blocks and their location for the file.
+ * Must be called after the stream is closed or the block length will be
+ * wrong.
+ * @param blocks the list of blocks
+ */
public void setBlocks(MockBlock... blocks) {
file.blocks = blocks;
int offset = 0;
@@ -580,12 +586,18 @@ public class TestInputOutputFormat {
file.content = new byte[file.length];
System.arraycopy(buf.getData(), 0, file.content, 0, file.length);
}
+
+ @Override
+ public String toString() {
+ return "Out stream to " + file.toString();
+ }
}
public static class MockFileSystem extends FileSystem {
final List<MockFile> files = new ArrayList<MockFile>();
Path workingDir = new Path("/");
+ @SuppressWarnings("unused")
public MockFileSystem() {
// empty
}
@@ -620,7 +632,7 @@ public class TestInputOutputFormat {
return new FSDataInputStream(new MockInputStream(file));
}
}
- return null;
+ throw new IOException("File not found: " + path);
}
@Override
@@ -743,8 +755,12 @@ public class TestInputOutputFormat {
for(MockBlock block: file.blocks) {
if (OrcInputFormat.SplitGenerator.getOverlap(block.offset,
block.length, start, len) > 0) {
+ String[] topology = new String[block.hosts.length];
+ for(int i=0; i < topology.length; ++i) {
+ topology[i] = "/rack/ " + block.hosts[i];
+ }
result.add(new BlockLocation(block.hosts, block.hosts,
- block.offset, block.length));
+ topology, block.offset, block.length));
}
}
return result.toArray(new BlockLocation[result.size()]);
@@ -1209,7 +1225,8 @@ public class TestInputOutputFormat {
Path warehouseDir,
String tableName,
ObjectInspector objectInspector,
- boolean isVectorized
+ boolean isVectorized,
+ int partitions
) throws IOException {
Utilities.clearWorkMap();
JobConf conf = new JobConf();
@@ -1218,9 +1235,20 @@ public class TestInputOutputFormat {
conf.set("hive.vectorized.execution.enabled", Boolean.toString(isVectorized));
conf.set("fs.mock.impl", MockFileSystem.class.getName());
conf.set("mapred.mapper.class", ExecMapper.class.getName());
- Path root = new Path(warehouseDir, tableName + "/p=0");
+ Path root = new Path(warehouseDir, tableName);
+ // clean out previous contents
((MockFileSystem) root.getFileSystem(conf)).clear();
- conf.set("mapred.input.dir", root.toString());
+ // build partition strings
+ String[] partPath = new String[partitions];
+ StringBuilder buffer = new StringBuilder();
+ for(int p=0; p < partitions; ++p) {
+ partPath[p] = new Path(root, "p=" + p).toString();
+ if (p != 0) {
+ buffer.append(',');
+ }
+ buffer.append(partPath[p]);
+ }
+ conf.set("mapred.input.dir", buffer.toString());
StringBuilder columnIds = new StringBuilder();
StringBuilder columnNames = new StringBuilder();
StringBuilder columnTypes = new StringBuilder();
@@ -1239,6 +1267,8 @@ public class TestInputOutputFormat {
}
conf.set("hive.io.file.readcolumn.ids", columnIds.toString());
conf.set("partition_columns", "p");
+ conf.set(serdeConstants.LIST_COLUMNS, columnNames.toString());
+ conf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
MockFileSystem fs = (MockFileSystem) warehouseDir.getFileSystem(conf);
fs.clear();
@@ -1249,9 +1279,6 @@ public class TestInputOutputFormat {
tblProps.put("columns.types", columnTypes.toString());
TableDesc tbl = new TableDesc(OrcInputFormat.class, OrcOutputFormat.class,
tblProps);
- LinkedHashMap<String, String> partSpec =
- new LinkedHashMap<String, String>();
- PartitionDesc part = new PartitionDesc(tbl, partSpec);
MapWork mapWork = new MapWork();
mapWork.setVectorMode(isVectorized);
@@ -1260,11 +1287,16 @@ public class TestInputOutputFormat {
new LinkedHashMap<String, ArrayList<String>>();
ArrayList<String> aliases = new ArrayList<String>();
aliases.add(tableName);
- aliasMap.put(root.toString(), aliases);
- mapWork.setPathToAliases(aliasMap);
LinkedHashMap<String, PartitionDesc> partMap =
new LinkedHashMap<String, PartitionDesc>();
- partMap.put(root.toString(), part);
+ for(int p=0; p < partitions; ++p) {
+ aliasMap.put(partPath[p], aliases);
+ LinkedHashMap<String, String> partSpec =
+ new LinkedHashMap<String, String>();
+ PartitionDesc part = new PartitionDesc(tbl, partSpec);
+ partMap.put(partPath[p], part);
+ }
+ mapWork.setPathToAliases(aliasMap);
mapWork.setPathToPartitionInfo(partMap);
mapWork.setScratchColumnMap(new HashMap<String, Map<String, Integer>>());
mapWork.setScratchColumnVectorTypes(new HashMap<String,
@@ -1285,6 +1317,7 @@ public class TestInputOutputFormat {
* @throws Exception
*/
@Test
+ @SuppressWarnings("unchecked")
public void testVectorization() throws Exception {
// get the object inspector for MyRow
StructObjectInspector inspector;
@@ -1294,7 +1327,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "vectorization", inspector, true);
+ "vectorization", inspector, true, 1);
// write the orc file to the mock file system
Writer writer =
@@ -1332,6 +1365,7 @@ public class TestInputOutputFormat {
* @throws Exception
*/
@Test
+ @SuppressWarnings("unchecked")
public void testVectorizationWithBuckets() throws Exception {
// get the object inspector for MyRow
StructObjectInspector inspector;
@@ -1341,7 +1375,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "vectorBuckets", inspector, true);
+ "vectorBuckets", inspector, true, 1);
// write the orc file to the mock file system
Writer writer =
@@ -1377,10 +1411,11 @@ public class TestInputOutputFormat {
// test acid with vectorization, no combine
@Test
+ @SuppressWarnings("unchecked")
public void testVectorizationWithAcid() throws Exception {
StructObjectInspector inspector = new BigRowInspector();
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "vectorizationAcid", inspector, true);
+ "vectorizationAcid", inspector, true, 1);
// write the orc file to the mock file system
Path partDir = new Path(conf.get("mapred.input.dir"));
@@ -1444,6 +1479,7 @@ public class TestInputOutputFormat {
// test non-vectorized, non-acid, combine
@Test
+ @SuppressWarnings("unchecked")
public void testCombinationInputFormat() throws Exception {
// get the object inspector for MyRow
StructObjectInspector inspector;
@@ -1453,7 +1489,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "combination", inspector, false);
+ "combination", inspector, false, 1);
// write the orc file to the mock file system
Path partDir = new Path(conf.get("mapred.input.dir"));
@@ -1516,17 +1552,25 @@ public class TestInputOutputFormat {
public void testCombinationInputFormatWithAcid() throws Exception {
// get the object inspector for MyRow
StructObjectInspector inspector;
+ final int PARTITIONS = 2;
+ final int BUCKETS = 3;
synchronized (TestOrcFile.class) {
inspector = (StructObjectInspector)
ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "combinationAcid", inspector, false);
+ "combinationAcid", inspector, false, PARTITIONS);
// write the orc file to the mock file system
- Path partDir = new Path(conf.get("mapred.input.dir"));
- OrcRecordUpdater writer = new OrcRecordUpdater(partDir,
+ Path[] partDir = new Path[PARTITIONS];
+ String[] paths = conf.getStrings("mapred.input.dir");
+ for(int p=0; p < PARTITIONS; ++p) {
+ partDir[p] = new Path(paths[p]);
+ }
+
+ // write a base file in partition 0
+ OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0],
new AcidOutputFormat.Options(conf).maximumTransactionId(10)
.writingBase(true).bucket(0).inspector(inspector));
for(int i=0; i < 10; ++i) {
@@ -1534,31 +1578,68 @@ public class TestInputOutputFormat {
}
WriterImpl baseWriter = (WriterImpl) writer.getWriter();
writer.close(false);
+
MockOutputStream outputStream = (MockOutputStream) baseWriter.getStream();
- int length0 = outputStream.file.length;
- writer = new OrcRecordUpdater(partDir,
+ outputStream.setBlocks(new MockBlock("host1", "host2"));
+
+ // write a delta file in partition 0
+ writer = new OrcRecordUpdater(partDir[0],
new AcidOutputFormat.Options(conf).maximumTransactionId(10)
.writingBase(true).bucket(1).inspector(inspector));
for(int i=10; i < 20; ++i) {
writer.insert(10, new MyRow(i, 2*i));
}
- baseWriter = (WriterImpl) writer.getWriter();
+ WriterImpl deltaWriter = (WriterImpl) writer.getWriter();
+ outputStream = (MockOutputStream) deltaWriter.getStream();
writer.close(false);
- outputStream = (MockOutputStream) baseWriter.getStream();
outputStream.setBlocks(new MockBlock("host1", "host2"));
+ // write three files in partition 1
+ for(int bucket=0; bucket < BUCKETS; ++bucket) {
+ Writer orc = OrcFile.createWriter(
+ new Path(partDir[1], "00000" + bucket + "_0"),
+ OrcFile.writerOptions(conf)
+ .blockPadding(false)
+ .bufferSize(1024)
+ .inspector(inspector));
+ orc.addRow(new MyRow(1, 2));
+ outputStream = (MockOutputStream) ((WriterImpl) orc).getStream();
+ orc.close();
+ outputStream.setBlocks(new MockBlock("host3", "host4"));
+ }
+
// call getsplits
+ conf.setInt(hive_metastoreConstants.BUCKET_COUNT, BUCKETS);
HiveInputFormat<?,?> inputFormat =
new CombineHiveInputFormat<WritableComparable, Writable>();
- try {
- InputSplit[] splits = inputFormat.getSplits(conf, 1);
- assertTrue("shouldn't reach here", false);
- } catch (IOException ioe) {
- assertEquals("CombineHiveInputFormat is incompatible"
- + " with ACID tables. Please set hive.input.format=org.apache.hadoop"
- + ".hive.ql.io.HiveInputFormat",
- ioe.getMessage());
+ InputSplit[] splits = inputFormat.getSplits(conf, 1);
+ assertEquals(3, splits.length);
+ HiveInputFormat.HiveInputSplit split =
+ (HiveInputFormat.HiveInputSplit) splits[0];
+ assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+ split.inputFormatClassName());
+ assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00000",
+ split.getPath().toString());
+ assertEquals(0, split.getStart());
+ assertEquals(580, split.getLength());
+ split = (HiveInputFormat.HiveInputSplit) splits[1];
+ assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+ split.inputFormatClassName());
+ assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00001",
+ split.getPath().toString());
+ assertEquals(0, split.getStart());
+ assertEquals(601, split.getLength());
+ CombineHiveInputFormat.CombineHiveInputSplit combineSplit =
+ (CombineHiveInputFormat.CombineHiveInputSplit) splits[2];
+ assertEquals(BUCKETS, combineSplit.getNumPaths());
+ for(int bucket=0; bucket < BUCKETS; ++bucket) {
+ assertEquals("mock:/combinationAcid/p=1/00000" + bucket + "_0",
+ combineSplit.getPath(bucket).toString());
+ assertEquals(0, combineSplit.getOffset(bucket));
+ assertEquals(225, combineSplit.getLength(bucket));
}
+ String[] hosts = combineSplit.getLocations();
+ assertEquals(2, hosts.length);
}
@Test