You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/08 06:38:26 UTC
svn commit: r1623263 [3/28] - in /hive/branches/spark: ./
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/
ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/
beeline/src/test/org/apache/hive/beeline/ bin/ ...
Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Sep 8 04:38:17 2014
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -43,7 +44,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.hive.conf.Validator.PatternSet;
import org.apache.hadoop.hive.conf.Validator.RangeValidator;
+import org.apache.hadoop.hive.conf.Validator.RatioValidator;
import org.apache.hadoop.hive.conf.Validator.StringSet;
+import org.apache.hadoop.hive.conf.Validator.TimeValidator;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
@@ -212,11 +215,18 @@ public class HiveConf extends Configurat
PLAN_SERIALIZATION("hive.plan.serialization.format", "kryo",
"Query plan format serialization between client and task nodes. \n" +
"Two supported values are : kryo and javaXML. Kryo is default."),
- SCRATCHDIR("hive.exec.scratchdir", "/tmp/hive-${system:user.name}", "Scratch space for Hive jobs"),
+ SCRATCHDIR("hive.exec.scratchdir", "/tmp/hive",
+ "HDFS root scratch dir for Hive jobs which gets created with 777 permission. " +
+ "For each connecting user, an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, " +
+ "with ${hive.scratch.dir.permission}."),
LOCALSCRATCHDIR("hive.exec.local.scratchdir",
"${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
"Local scratch space for Hive jobs"),
- SCRATCHDIRPERMISSION("hive.scratch.dir.permission", "700", ""),
+ DOWNLOADED_RESOURCES_DIR("hive.downloaded.resources.dir",
+ "${system:java.io.tmpdir}" + File.separator + "${hive.session.id}_resources",
+ "Temporary local directory for added resources in the remote file system."),
+ SCRATCHDIRPERMISSION("hive.scratch.dir.permission", "700",
+ "The permission for the user specific scratch directories that get created."),
SUBMITVIACHILD("hive.exec.submitviachild", false, ""),
SUBMITLOCALTASKVIACHILD("hive.exec.submit.local.task.via.child", true,
"Determines whether local tasks (typically mapjoin hashtable generation phase) runs in \n" +
@@ -280,9 +290,6 @@ public class HiveConf extends Configurat
"Maximum number of dynamic partitions allowed to be created in each mapper/reducer node."),
MAXCREATEDFILES("hive.exec.max.created.files", 100000L,
"Maximum number of HDFS files created by all mappers/reducers in a MapReduce job."),
- DOWNLOADED_RESOURCES_DIR("hive.downloaded.resources.dir",
- "${system:java.io.tmpdir}" + File.separator + "${hive.session.id}_resources",
- "Temporary local directory for added resources in the remote file system."),
DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__",
"The default partition name in case the dynamic partition column value is null/empty string or any other values that cannot be escaped. \n" +
"This value must not contain any special character used in HDFS URI (e.g., ':', '%', '/' etc). \n" +
@@ -361,9 +368,11 @@ public class HiveConf extends Configurat
METASTORETHRIFTFAILURERETRIES("hive.metastore.failure.retries", 1,
"Number of retries upon failure of Thrift metastore calls"),
- METASTORE_CLIENT_CONNECT_RETRY_DELAY("hive.metastore.client.connect.retry.delay", 1,
+ METASTORE_CLIENT_CONNECT_RETRY_DELAY("hive.metastore.client.connect.retry.delay", "1s",
+ new TimeValidator(TimeUnit.SECONDS),
"Number of seconds for the client to wait between consecutive connection attempts"),
- METASTORE_CLIENT_SOCKET_TIMEOUT("hive.metastore.client.socket.timeout", 600,
+ METASTORE_CLIENT_SOCKET_TIMEOUT("hive.metastore.client.socket.timeout", "600s",
+ new TimeValidator(TimeUnit.SECONDS),
"MetaStore Client socket timeout in seconds"),
METASTOREPWD("javax.jdo.option.ConnectionPassword", "mine",
"password to use against metastore database"),
@@ -374,11 +383,10 @@ public class HiveConf extends Configurat
METASTORECONNECTURLKEY("javax.jdo.option.ConnectionURL",
"jdbc:derby:;databaseName=metastore_db;create=true",
"JDBC connect string for a JDBC metastore"),
-
HMSHANDLERATTEMPTS("hive.hmshandler.retry.attempts", 1,
- "The number of times to retry a HMSHandler call if there were a connection error"),
- HMSHANDLERINTERVAL("hive.hmshandler.retry.interval", 1000,
- "The number of milliseconds between HMSHandler retry attempts"),
+ "The number of times to retry a HMSHandler call if there were a connection error."),
+ HMSHANDLERINTERVAL("hive.hmshandler.retry.interval", "1000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS), "The time between HMSHandler retry attempts on failure."),
HMSHANDLERFORCERELOADCONF("hive.hmshandler.force.reload.conf", false,
"Whether to force reloading of the HMSHandler configuration (including\n" +
"the connection URL, before the next metastore query that accesses the\n" +
@@ -473,10 +481,12 @@ public class HiveConf extends Configurat
"for operations like drop-partition (disallow the drop-partition if the user in\n" +
"question doesn't have permissions to delete the corresponding directory\n" +
"on the storage)."),
- METASTORE_EVENT_CLEAN_FREQ("hive.metastore.event.clean.freq", 0L,
- "Frequency at which timer task runs to purge expired events in metastore(in seconds)."),
- METASTORE_EVENT_EXPIRY_DURATION("hive.metastore.event.expiry.duration", 0L,
- "Duration after which events expire from events table (in seconds)"),
+ METASTORE_EVENT_CLEAN_FREQ("hive.metastore.event.clean.freq", "0s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Frequency at which timer task runs to purge expired events in metastore."),
+ METASTORE_EVENT_EXPIRY_DURATION("hive.metastore.event.expiry.duration", "0s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Duration after which events expire from events table"),
METASTORE_EXECUTE_SET_UGI("hive.metastore.execute.setugi", true,
"In unsecure mode, setting this property to true will cause the metastore to execute DFS operations using \n" +
"the client's reported user and group permissions. Note that this property must be set on \n" +
@@ -580,6 +590,11 @@ public class HiveConf extends Configurat
HIVEJAR("hive.jar.path", "", ""),
HIVEAUXJARS("hive.aux.jars.path", "", ""),
+ // reloadable jars
+ HIVERELOADABLEJARS("hive.reloadable.aux.jars.path", "",
+ "Jars can be renewed by executing reload command. And these jars can be "
+ + "used as the auxiliary classes like creating a UDF or SerDe."),
+
// hive added files and jars
HIVEADDEDFILES("hive.added.files.path", "", ""),
HIVEADDEDJARS("hive.added.jars.path", "", ""),
@@ -588,8 +603,9 @@ public class HiveConf extends Configurat
HIVE_CURRENT_DATABASE("hive.current.database", "", "Database name used by current session. Internal usage only.", true),
// for hive script operator
- HIVES_AUTO_PROGRESS_TIMEOUT("hive.auto.progress.timeout", 0,
- "How long to run autoprogressor for the script/UDTF operators (in seconds).\n" +
+ HIVES_AUTO_PROGRESS_TIMEOUT("hive.auto.progress.timeout", "0s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "How long to run autoprogressor for the script/UDTF operators.\n" +
"Set to 0 for forever."),
HIVETABLENAME("hive.table.name", "", ""),
HIVEPARTITIONNAME("hive.partition.name", "", ""),
@@ -698,10 +714,9 @@ public class HiveConf extends Configurat
"because this may prevent TaskTracker from killing tasks with infinite loops."),
HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile", new StringSet("TextFile", "SequenceFile", "RCfile", "ORC"),
- "Default file format for CREATE TABLE statement. \n" +
- "Options are TextFile, SequenceFile, RCfile and ORC. Users can explicitly override it by CREATE TABLE ... STORED AS [FORMAT]"),
+ "Default file format for CREATE TABLE statement. Users can explicitly override it by CREATE TABLE ... STORED AS [FORMAT]"),
HIVEQUERYRESULTFILEFORMAT("hive.query.result.fileformat", "TextFile", new StringSet("TextFile", "SequenceFile", "RCfile"),
- "Default file format for storing result of the query. Allows TextFile, SequenceFile and RCfile"),
+ "Default file format for storing result of the query."),
HIVECHECKFILEFORMAT("hive.fileformat.check", true, "Whether to check file format or not when loading data files"),
// default serde for rcfile
@@ -728,8 +743,9 @@ public class HiveConf extends Configurat
"Whether to log the plan's progress every time a job's progress is checked.\n" +
"These logs are written to the location specified by hive.querylog.location"),
- HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL("hive.querylog.plan.progress.interval", 60000L,
- "The interval to wait between logging the plan's progress in milliseconds.\n" +
+ HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL("hive.querylog.plan.progress.interval", "60000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "The interval to wait between logging the plan's progress.\n" +
"If there is a whole number percentage change in the progress of the mappers or the reducers,\n" +
"the progress is logged regardless of this value.\n" +
"The actual interval will be the ceiling of (this value divided by the value of\n" +
@@ -809,7 +825,7 @@ public class HiveConf extends Configurat
" config (hive.exec.orc.block.padding.tolerance)."),
HIVEMERGEINPUTFORMATSTRIPELEVEL("hive.merge.input.format.stripe.level",
"org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat",
- "Input file format to use for ORC stripe level merging (for internal use only)"),
+ "Input file format to use for ORC stripe level merging (for internal use only)"),
HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS(
"hive.merge.current.job.has.dynamic.partitions", false, ""),
@@ -838,6 +854,10 @@ public class HiveConf extends Configurat
"If the number of keys in a dictionary is greater than this fraction of the total number of\n" +
"non-null rows, turn off dictionary encoding. Use 1 to always use dictionary encoding."),
HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE("hive.exec.orc.default.row.index.stride", 10000, "Define the default ORC index stride"),
+ HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK("hive.orc.row.index.stride.dictionary.check", true,
+ "If enabled dictionary check will happen after first row index stride (default 10000 rows)\n" +
+ "else dictionary check will happen before writing first stripe. In both cases, the decision\n" +
+ "to use dictionary or not will be retained thereafter."),
HIVE_ORC_DEFAULT_BUFFER_SIZE("hive.exec.orc.default.buffer.size", 256 * 1024, "Define the default ORC buffer size"),
HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding", true, "Define the default block padding"),
HIVE_ORC_BLOCK_PADDING_TOLERANCE("hive.exec.orc.block.padding.tolerance", 0.05f,
@@ -850,8 +870,7 @@ public class HiveConf extends Configurat
HIVE_ORC_ENCODING_STRATEGY("hive.exec.orc.encoding.strategy", "SPEED", new StringSet("SPEED", "COMPRESSION"),
"Define the encoding strategy to use while writing data. Changing this will\n" +
"only affect the light weight encoding for integers. This flag will not\n" +
- "change the compression level of higher level compression codec (like ZLIB).\n" +
- "Possible options are SPEED and COMPRESSION."),
+ "change the compression level of higher level compression codec (like ZLIB)."),
HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false,
"If turned on splits generated by orc will include metadata about the stripes in the file. This\n" +
@@ -1039,9 +1058,11 @@ public class HiveConf extends Configurat
"When enabled dynamic partitioning column will be globally sorted.\n" +
"This way we can keep only one record writer open for each partition value\n" +
"in the reducer thereby reducing the memory pressure on reducers."),
- HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false, ""),
- HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000, ""),
- HIVESAMPLINGPERCENTFORORDERBY("hive.optimize.sampling.orderby.percent", 0.1f, ""),
+
+ HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false, "Uses sampling on order-by clause for parallel execution."),
+ HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000, "Total number of samples to be obtained."),
+ HIVESAMPLINGPERCENTFORORDERBY("hive.optimize.sampling.orderby.percent", 0.1f, new RatioValidator(),
+ "Probability with which a row will be chosen."),
// whether to optimize union followed by select followed by filesink
// It creates sub-directories in the final output, so should not be turned on in systems
@@ -1108,16 +1129,17 @@ public class HiveConf extends Configurat
"The Java class (implementing the StatsPublisher interface) that is used by default if hive.stats.dbclass is custom type."),
HIVE_STATS_DEFAULT_AGGREGATOR("hive.stats.default.aggregator", "",
"The Java class (implementing the StatsAggregator interface) that is used by default if hive.stats.dbclass is custom type."),
- HIVE_STATS_JDBC_TIMEOUT("hive.stats.jdbc.timeout", 30,
- "Timeout value (number of seconds) used by JDBC connection and statements."),
+ HIVE_STATS_JDBC_TIMEOUT("hive.stats.jdbc.timeout", "30s", new TimeValidator(TimeUnit.SECONDS),
+ "Timeout value used by JDBC connection and statements."),
HIVE_STATS_ATOMIC("hive.stats.atomic", false,
"whether to update metastore stats only if all stats are available"),
HIVE_STATS_RETRIES_MAX("hive.stats.retries.max", 0,
"Maximum number of retries when stats publisher/aggregator got an exception updating intermediate database. \n" +
"Default is no tries on failures."),
- HIVE_STATS_RETRIES_WAIT("hive.stats.retries.wait", 3000,
- "The base waiting window (in milliseconds) before the next retry. The actual wait time is calculated by " +
- "baseWindow * failures baseWindow * (failure 1) * (random number between [0.0,1.0])."),
+ HIVE_STATS_RETRIES_WAIT("hive.stats.retries.wait", "3000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "The base waiting window before the next retry. The actual wait time is calculated by " +
+ "baseWindow * failures baseWindow * (failure + 1) * (random number between [0.0,1.0])."),
HIVE_STATS_COLLECT_RAWDATASIZE("hive.stats.collect.rawdatasize", true,
"should the raw data size be collected when analyzing tables"),
CLIENT_STATS_COUNTERS("hive.client.stats.counters", "",
@@ -1227,8 +1249,9 @@ public class HiveConf extends Configurat
"The number of times you want to try to get all the locks"),
HIVE_UNLOCK_NUMRETRIES("hive.unlock.numretries", 10,
"The number of times you want to retry to do one unlock"),
- HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", 60,
- "The sleep time (in seconds) between various retries"),
+ HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", "60s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "The sleep time between various retries"),
HIVE_LOCK_MAPRED_ONLY("hive.lock.mapred.only.operation", false,
"This param is to control whether or not only do lock on queries\n" +
"that need to execute at least one mapred job."),
@@ -1248,8 +1271,8 @@ public class HiveConf extends Configurat
// Transactions
HIVE_TXN_MANAGER("hive.txn.manager",
"org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager", ""),
- HIVE_TXN_TIMEOUT("hive.txn.timeout", 300,
- "time after which transactions are declared aborted if the client has not sent a heartbeat, in seconds."),
+ HIVE_TXN_TIMEOUT("hive.txn.timeout", "300s", new TimeValidator(TimeUnit.SECONDS),
+ "time after which transactions are declared aborted if the client has not sent a heartbeat."),
HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000,
"Maximum number of transactions that can be fetched in one call to open_txns().\n" +
@@ -1263,12 +1286,14 @@ public class HiveConf extends Configurat
HIVE_COMPACTOR_WORKER_THREADS("hive.compactor.worker.threads", 0,
"Number of compactor worker threads to run on this metastore instance."),
- HIVE_COMPACTOR_WORKER_TIMEOUT("hive.compactor.worker.timeout", 86400L,
- "Time in seconds, before a given compaction in working state is declared a failure\n" +
+ HIVE_COMPACTOR_WORKER_TIMEOUT("hive.compactor.worker.timeout", "86400s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Time before a given compaction in working state is declared a failure\n" +
"and returned to the initiated state."),
- HIVE_COMPACTOR_CHECK_INTERVAL("hive.compactor.check.interval", 300L,
- "Time in seconds between checks to see if any partitions need compacted.\n" +
+ HIVE_COMPACTOR_CHECK_INTERVAL("hive.compactor.check.interval", "300s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Time between checks to see if any partitions need compacted.\n" +
"This should be kept high because each check for compaction requires many calls against the NameNode."),
HIVE_COMPACTOR_DELTA_NUM_THRESHOLD("hive.compactor.delta.num.threshold", 10,
@@ -1305,7 +1330,7 @@ public class HiveConf extends Configurat
"Currently the query should be single sourced not having any subquery and should not have\n" +
"any aggregations or distincts (which incurs RS), lateral views and joins.\n" +
"1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only\n" +
- "2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)\n"
+ "2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)"
),
HIVEFETCHTASKCONVERSIONTHRESHOLD("hive.fetch.task.conversion.threshold", 1073741824L,
"Input threshold for applying hive.fetch.task.conversion. If target table is native, input length\n" +
@@ -1477,12 +1502,12 @@ public class HiveConf extends Configurat
"table. From 0.12 onwards, they are displayed separately. This flag will let you\n" +
"get old behavior, if desired. See, test-case in patch for HIVE-6689."),
- HIVE_SERVER2_MAX_START_ATTEMPTS("hive.server2.max.start.attempts", 30L, new RangeValidator(0L, Long.MAX_VALUE),
+ HIVE_SERVER2_MAX_START_ATTEMPTS("hive.server2.max.start.attempts", 30L, new RangeValidator(0L, null),
"This number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds between retries. \n" +
"The default of 30 will keep trying for 30 minutes."),
HIVE_SERVER2_TRANSPORT_MODE("hive.server2.transport.mode", "binary", new StringSet("binary", "http"),
- "Server transport mode. \"binary\" or \"http\""),
+ "Transport mode of HiveServer2."),
// http (over thrift) transport settings
HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001,
@@ -1493,11 +1518,13 @@ public class HiveConf extends Configurat
"Minimum number of worker threads when in HTTP mode."),
HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS("hive.server2.thrift.http.max.worker.threads", 500,
"Maximum number of worker threads when in HTTP mode."),
- HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", 1800000,
- "Maximum idle time in milliseconds for a connection on the server when in HTTP mode."),
- HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME("hive.server2.thrift.http.worker.keepalive.time", 60,
- "Keepalive time (in seconds) for an idle http worker thread. When number of workers > min workers, " +
- "excess threads are killed after this time interval."),
+ HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", "1800s",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Maximum idle time for a connection on the server when in HTTP mode."),
+ HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME("hive.server2.thrift.http.worker.keepalive.time", "60s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Keepalive time for an idle http worker thread. When the number of workers exceeds min workers, " +
+ "excessive threads are killed after this time interval."),
// binary transport settings
HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000,
@@ -1520,23 +1547,26 @@ public class HiveConf extends Configurat
"Minimum number of Thrift worker threads"),
HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS("hive.server2.thrift.max.worker.threads", 500,
"Maximum number of Thrift worker threads"),
- HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME("hive.server2.thrift.worker.keepalive.time", 60,
- "Keepalive time (in seconds) for an idle worker thread. When number of workers > min workers, " +
- "excess threads are killed after this time interval."),
+ HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME("hive.server2.thrift.worker.keepalive.time", "60s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, " +
+ "excessive threads are killed after this time interval."),
// Configuration for async thread pool in SessionManager
HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100,
"Number of threads in the async thread pool for HiveServer2"),
- HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10,
- "Time (in seconds) for which HiveServer2 shutdown will wait for async"),
+ HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", "10s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Maximum time for which HiveServer2 shutdown will wait for async"),
HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE("hive.server2.async.exec.wait.queue.size", 100,
"Size of the wait queue for async thread pool in HiveServer2.\n" +
"After hitting this limit, the async thread pool will reject new requests."),
- HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", 10,
- "Time (in seconds) that an idle HiveServer2 async thread (from the thread pool) will wait\n" +
- "for a new task to arrive before terminating"),
- HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000L,
- "Time in milliseconds that HiveServer2 will wait,\n" +
- "before responding to asynchronous calls that use long polling"),
+ HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", "10s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Time that an idle HiveServer2 async thread (from the thread pool) will wait for a new task\n" +
+ "to arrive before terminating"),
+ HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", "5000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Time that HiveServer2 will wait before responding to asynchronous calls that use long polling"),
// HiveServer2 auth configuration
HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE",
@@ -1579,8 +1609,8 @@ public class HiveConf extends Configurat
"must be a proper implementation of the interface\n" +
"org.apache.hive.service.auth.PasswdAuthenticationProvider. HiveServer2\n" +
"will call its Authenticate(user, passed) method to authenticate requests.\n" +
- "The implementation may optionally extend Hadoop's\n" +
- "org.apache.hadoop.conf.Configured class to grab Hive's Configuration object."),
+ "The implementation may optionally implement Hadoop's\n" +
+ "org.apache.hadoop.conf.Configurable class to grab Hive's Configuration object."),
HIVE_SERVER2_PAM_SERVICES("hive.server2.authentication.pam.services", null,
"List of the underlying pam services that should be used when auth type is PAM\n" +
"A file with the same name must exist in /etc/pam.d"),
@@ -1598,9 +1628,21 @@ public class HiveConf extends Configurat
HIVE_SERVER2_SSL_KEYSTORE_PATH("hive.server2.keystore.path", "", ""),
HIVE_SERVER2_SSL_KEYSTORE_PASSWORD("hive.server2.keystore.password", "", ""),
- HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,compile",
+ HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
"Comma separated list of non-SQL Hive commands users are authorized to execute"),
+ HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "0ms",
+ new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false),
+ "The check interval for session/operation timeout, which can be disabled by setting to zero or negative value."),
+ HIVE_SERVER2_IDLE_SESSION_TIMEOUT("hive.server2.idle.session.timeout", "0ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value."),
+ HIVE_SERVER2_IDLE_OPERATION_TIMEOUT("hive.server2.idle.operation.timeout", "0ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Operation will be closed when it's not accessed for this duration of time, which can be disabled by setting to zero value.\n" +
+ " With positive value, it's checked for operations in terminal state only (FINISHED, CANCELED, CLOSED, ERROR).\n" +
+ " With negative value, it's checked for all of the operations regardless of state."),
+
HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list",
"hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role",
"Comma separated list of configuration options which are immutable at runtime"),
@@ -1658,8 +1700,9 @@ public class HiveConf extends Configurat
"Enable list bucketing optimizer. Default value is false so that we disable it by default."),
// Allow TCP Keep alive socket option for for HiveServer or a maximum timeout for the socket.
- SERVER_READ_SOCKET_TIMEOUT("hive.server.read.socket.timeout", 10,
- "Timeout for the HiveServer to close the connection if no response from the client in N seconds, defaults to 10 seconds."),
+ SERVER_READ_SOCKET_TIMEOUT("hive.server.read.socket.timeout", "10s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Timeout for the HiveServer to close the connection if no response from the client. By default, 10 seconds."),
SERVER_TCP_KEEP_ALIVE("hive.server.tcp.keepalive", true,
"Whether to enable TCP keepalive for the Hive Server. Keepalive will prevent accumulation of half-open connections."),
@@ -1718,8 +1761,9 @@ public class HiveConf extends Configurat
"turning on Tez for HiveServer2. The user could potentially want to run queries\n" +
"over Tez without the pool of sessions."),
- HIVE_QUOTEDID_SUPPORT("hive.support.quoted.identifiers", "column", new PatternSet("none", "column"),
- "Whether to use quoted identifier. 'none' ot 'column' can be used. \n" +
+ HIVE_QUOTEDID_SUPPORT("hive.support.quoted.identifiers", "column",
+ new StringSet("none", "column"),
+ "Whether to use quoted identifier. 'none' or 'column' can be used. \n" +
" none: default(past) behavior. Implies only alphaNumeric and underscore are valid characters in identifiers.\n" +
" column: implies column names can contain any character."
),
@@ -1739,8 +1783,9 @@ public class HiveConf extends Configurat
HIVE_CHECK_CROSS_PRODUCT("hive.exec.check.crossproducts", true,
"Check if a plan contains a Cross Product. If there is one, output a warning to the Session's console."),
- HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL("hive.localize.resource.wait.interval", 5000L,
- "Time in milliseconds to wait for another thread to localize the same resource for hive-tez."),
+ HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL("hive.localize.resource.wait.interval", "5000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Time to wait for another thread to localize the same resource for hive-tez."),
HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS("hive.localize.resource.num.wait.attempts", 5,
"The number of attempts waiting for localizing a resource in hive-tez."),
TEZ_AUTO_REDUCER_PARALLELISM("hive.tez.auto.reducer.parallelism", false,
@@ -1751,7 +1796,15 @@ public class HiveConf extends Configurat
"When auto reducer parallelism is enabled this factor will be used to over-partition data in shuffle edges."),
TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f,
"When auto reducer parallelism is enabled this factor will be used to put a lower limit to the number\n" +
- "of reducers that tez specifies.")
+ "of reducers that tez specifies."),
+ TEZ_DYNAMIC_PARTITION_PRUNING(
+ "hive.tez.dynamic.partition.pruning", true,
+ "When dynamic pruning is enabled, joins on partition keys will be processed by sending events from the processing " +
+ "vertices to the tez application master. These events will be used to prune unnecessary partitions."),
+ TEZ_DYNAMIC_PARTITION_PRUNING_MAX_EVENT_SIZE("hive.tez.dynamic.partition.pruning.max.event.size", 1*1024*1024L,
+ "Maximum size of events sent by processors in dynamic pruning. If this size is crossed no pruning will take place."),
+ TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE("hive.tez.dynamic.parition.pruning.max.data.size", 100*1024*1024L,
+ "Maximum total data size of events in dynamic pruning.")
;
public final String varname;
@@ -1850,11 +1903,29 @@ public class HiveConf extends Configurat
return validator == null ? null : validator.validate(value);
}
+ public String validatorDescription() {
+ return validator == null ? null : validator.toDescription();
+ }
+
public String typeString() {
- return valType.typeString();
+ String type = valType.typeString();
+ if (valType == VarType.STRING && validator != null) {
+ if (validator instanceof TimeValidator) {
+ type += "(TIME)";
+ }
+ }
+ return type;
+ }
+
+ public String getRawDescription() {
+ return description;
}
public String getDescription() {
+ String validator = validatorDescription();
+ if (validator != null) {
+ return validator + ".\n" + description;
+ }
return description;
}
@@ -1991,6 +2062,82 @@ public class HiveConf extends Configurat
setIntVar(this, var, val);
}
+ public static long getTimeVar(Configuration conf, ConfVars var, TimeUnit outUnit) {
+ return toTime(getVar(conf, var), getDefaultTimeUnit(var), outUnit);
+ }
+
+ public static void setTimeVar(Configuration conf, ConfVars var, long time, TimeUnit timeunit) {
+ assert (var.valClass == String.class) : var.varname;
+ conf.set(var.varname, time + stringFor(timeunit));
+ }
+
+ public long getTimeVar(ConfVars var, TimeUnit outUnit) {
+ return getTimeVar(this, var, outUnit);
+ }
+
+ public void setTimeVar(ConfVars var, long time, TimeUnit outUnit) {
+ setTimeVar(this, var, time, outUnit);
+ }
+
+ private static TimeUnit getDefaultTimeUnit(ConfVars var) {
+ TimeUnit inputUnit = null;
+ if (var.validator instanceof TimeValidator) {
+ inputUnit = ((TimeValidator)var.validator).getTimeUnit();
+ }
+ return inputUnit;
+ }
+
+ public static long toTime(String value, TimeUnit inputUnit, TimeUnit outUnit) {
+ String[] parsed = parseTime(value.trim());
+ return outUnit.convert(Long.valueOf(parsed[0].trim().trim()), unitFor(parsed[1].trim(), inputUnit));
+ }
+
+ private static String[] parseTime(String value) {
+ char[] chars = value.toCharArray();
+ int i = 0;
+ for (; i < chars.length && (chars[i] == '-' || Character.isDigit(chars[i])); i++) {
+ }
+ return new String[] {value.substring(0, i), value.substring(i)};
+ }
+
+ public static TimeUnit unitFor(String unit, TimeUnit defaultUnit) {
+ unit = unit.trim().toLowerCase();
+ if (unit.isEmpty()) {
+ if (defaultUnit == null) {
+ throw new IllegalArgumentException("Time unit is not specified");
+ }
+ return defaultUnit;
+ } else if (unit.equals("d") || unit.startsWith("day")) {
+ return TimeUnit.DAYS;
+ } else if (unit.equals("h") || unit.startsWith("hour")) {
+ return TimeUnit.HOURS;
+ } else if (unit.equals("m") || unit.startsWith("min")) {
+ return TimeUnit.MINUTES;
+ } else if (unit.equals("s") || unit.startsWith("sec")) {
+ return TimeUnit.SECONDS;
+ } else if (unit.equals("ms") || unit.startsWith("msec")) {
+ return TimeUnit.MILLISECONDS;
+ } else if (unit.equals("us") || unit.startsWith("usec")) {
+ return TimeUnit.MICROSECONDS;
+ } else if (unit.equals("ns") || unit.startsWith("nsec")) {
+ return TimeUnit.NANOSECONDS;
+ }
+ throw new IllegalArgumentException("Invalid time unit " + unit);
+ }
+
+ public static String stringFor(TimeUnit timeunit) {
+ switch (timeunit) {
+ case DAYS: return "day";
+ case HOURS: return "hour";
+ case MINUTES: return "min";
+ case SECONDS: return "sec";
+ case MILLISECONDS: return "msec";
+ case MICROSECONDS: return "usec";
+ case NANOSECONDS: return "nsec";
+ }
+ throw new IllegalArgumentException("Invalid timeunit " + timeunit);
+ }
+
public static long getLongVar(Configuration conf, ConfVars var) {
assert (var.valClass == Long.class) : var.varname;
return conf.getLong(var.varname, var.defaultLongVal);
Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/Validator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/Validator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/Validator.java (original)
+++ hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/Validator.java Mon Sep 8 04:38:17 2014
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
/**
@@ -31,57 +32,85 @@ public interface Validator {
String validate(String value);
- static class StringSet implements Validator {
+ String toDescription();
+ class StringSet implements Validator {
+
+ private final boolean caseSensitive;
private final Set<String> expected = new LinkedHashSet<String>();
public StringSet(String... values) {
+ this(false, values);
+ }
+
+ public StringSet(boolean caseSensitive, String... values) {
+ this.caseSensitive = caseSensitive;
for (String value : values) {
- expected.add(value.toLowerCase());
+ expected.add(caseSensitive ? value : value.toLowerCase());
}
}
@Override
public String validate(String value) {
- if (value == null || !expected.contains(value.toLowerCase())) {
+ if (value == null || !expected.contains(caseSensitive ? value : value.toLowerCase())) {
return "Invalid value.. expects one of " + expected;
}
return null;
}
+
+ @Override
+ public String toDescription() {
+ return "Expects one of " + expected;
+ }
}
- static enum RANGE_TYPE {
+ enum TYPE {
INT {
@Override
protected boolean inRange(String value, Object lower, Object upper) {
int ivalue = Integer.parseInt(value);
- return (Integer)lower <= ivalue && ivalue <= (Integer)upper;
+ if (lower != null && ivalue < (Integer)lower) {
+ return false;
+ }
+ if (upper != null && ivalue > (Integer)upper) {
+ return false;
+ }
+ return true;
}
},
LONG {
@Override
protected boolean inRange(String value, Object lower, Object upper) {
long lvalue = Long.parseLong(value);
- return (Long)lower <= lvalue && lvalue <= (Long)upper;
+ if (lower != null && lvalue < (Long)lower) {
+ return false;
+ }
+ if (upper != null && lvalue > (Long)upper) {
+ return false;
+ }
+ return true;
}
},
FLOAT {
@Override
protected boolean inRange(String value, Object lower, Object upper) {
float fvalue = Float.parseFloat(value);
- return (Float)lower <= fvalue && fvalue <= (Float)upper;
+ if (lower != null && fvalue < (Float)lower) {
+ return false;
+ }
+ if (upper != null && fvalue > (Float)upper) {
+ return false;
+ }
+ return true;
}
};
- public static RANGE_TYPE valueOf(Object lower, Object upper) {
- if (lower instanceof Integer && upper instanceof Integer) {
- assert (Integer)lower < (Integer)upper;
+ public static TYPE valueOf(Object lower, Object upper) {
+ if (lower instanceof Integer || upper instanceof Integer) {
return INT;
- } else if (lower instanceof Long && upper instanceof Long) {
- assert (Long)lower < (Long)upper;
+ } else if (lower instanceof Long || upper instanceof Long) {
return LONG;
- } else if (lower instanceof Float && upper instanceof Float) {
- assert (Float)lower < (Float)upper;
+ } else if (lower instanceof Float || upper instanceof Float) {
return FLOAT;
}
throw new IllegalArgumentException("invalid range from " + lower + " to " + upper);
@@ -90,15 +119,15 @@ public interface Validator {
protected abstract boolean inRange(String value, Object lower, Object upper);
}
- static class RangeValidator implements Validator {
+ class RangeValidator implements Validator {
- private final RANGE_TYPE type;
+ private final TYPE type;
private final Object lower, upper;
public RangeValidator(Object lower, Object upper) {
this.lower = lower;
this.upper = upper;
- this.type = RANGE_TYPE.valueOf(lower, upper);
+ this.type = TYPE.valueOf(lower, upper);
}
@Override
@@ -115,9 +144,23 @@ public interface Validator {
}
return null;
}
+
+ @Override
+ public String toDescription() {
+ if (lower == null && upper == null) {
+ return null;
+ }
+ if (lower != null && upper != null) {
+ return "Expects value between " + lower + " and " + upper;
+ }
+ if (lower != null) {
+ return "Expects value bigger than " + lower;
+ }
+ return "Expects value smaller than " + upper;
+ }
}
- static class PatternSet implements Validator {
+ class PatternSet implements Validator {
private final List<Pattern> expected = new ArrayList<Pattern>();
@@ -139,15 +182,20 @@ public interface Validator {
}
return "Invalid value.. expects one of patterns " + expected;
}
+
+ @Override
+ public String toDescription() {
+ return "Expects one of the pattern in " + expected;
+ }
}
- static class RatioValidator implements Validator {
+ class RatioValidator implements Validator {
@Override
public String validate(String value) {
try {
float fvalue = Float.valueOf(value);
- if (fvalue <= 0 || fvalue >= 1) {
+ if (fvalue < 0 || fvalue > 1) {
return "Invalid ratio " + value + ", which should be in between 0 to 1";
}
} catch (NumberFormatException e) {
@@ -155,5 +203,77 @@ public interface Validator {
}
return null;
}
+
+ @Override
+ public String toDescription() {
+ return "Expects value between 0.0f and 1.0f";
+ }
+ }
+
+ class TimeValidator implements Validator {
+
+ private final TimeUnit timeUnit;
+
+ private final Long min;
+ private final boolean minInclusive;
+
+ private final Long max;
+ private final boolean maxInclusive;
+
+ public TimeValidator(TimeUnit timeUnit) {
+ this(timeUnit, null, false, null, false);
+ }
+
+ public TimeValidator(TimeUnit timeUnit,
+ Long min, boolean minInclusive, Long max, boolean maxInclusive) {
+ this.timeUnit = timeUnit;
+ this.min = min;
+ this.minInclusive = minInclusive;
+ this.max = max;
+ this.maxInclusive = maxInclusive;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ @Override
+ public String validate(String value) {
+ try {
+ long time = HiveConf.toTime(value, timeUnit, timeUnit);
+ if (min != null && (minInclusive ? time < min : time <= min)) {
+ return value + " is smaller than " + timeString(min);
+ }
+ if (max != null && (maxInclusive ? time > max : time >= max)) {
+ return value + " is bigger than " + timeString(max);
+ }
+ } catch (Exception e) {
+ return e.toString();
+ }
+ return null;
+ }
+
+ public String toDescription() {
+ String description =
+ "Expects a time value with unit " +
+ "(d/day, h/hour, m/min, s/sec, ms/msec, us/usec, ns/nsec)" +
+ ", which is " + HiveConf.stringFor(timeUnit) + " if not specified";
+ if (min != null && max != null) {
+ description += ".\nThe time should be in between " +
+ timeString(min) + (minInclusive ? " (inclusive)" : " (exclusive)") + " and " +
+ timeString(max) + (maxInclusive ? " (inclusive)" : " (exclusive)");
+ } else if (min != null) {
+ description += ".\nThe time should be bigger than " +
+ (minInclusive ? "or equal to " : "") + timeString(min);
+ } else if (max != null) {
+ description += ".\nThe time should be smaller than " +
+ (maxInclusive ? "or equal to " : "") + timeString(max);
+ }
+ return description;
+ }
+
+ private String timeString(long time) {
+ return time + " " + HiveConf.stringFor(timeUnit);
+ }
}
}
Modified: hive/branches/spark/common/src/java/org/apache/hive/common/util/HiveStringUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hive/common/util/HiveStringUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/common/src/java/org/apache/hive/common/util/HiveStringUtils.java (original)
+++ hive/branches/spark/common/src/java/org/apache/hive/common/util/HiveStringUtils.java Mon Sep 8 04:38:17 2014
@@ -33,9 +33,13 @@ import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import java.util.Locale;
import java.util.StringTokenizer;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
@@ -57,10 +61,62 @@ public class HiveStringUtils {
public static final int SHUTDOWN_HOOK_PRIORITY = 0;
private static final DecimalFormat decimalFormat;
+
+ /**
+ * Maintain a String pool to reduce memory.
+ */
+ private static final Interner<String> STRING_INTERNER;
+
static {
- NumberFormat numberFormat = NumberFormat.getNumberInstance(Locale.ENGLISH);
- decimalFormat = (DecimalFormat) numberFormat;
- decimalFormat.applyPattern("#.##");
+ NumberFormat numberFormat = NumberFormat.getNumberInstance(Locale.ENGLISH);
+ decimalFormat = (DecimalFormat) numberFormat;
+ decimalFormat.applyPattern("#.##");
+
+ STRING_INTERNER = Interners.newWeakInterner();
+ }
+
+ /**
+ * Return the internalized string, or null if the given string is null.
+ * @param str The string to intern
+ * @return The identical string cached in the string pool.
+ */
+ public static String intern(String str) {
+ if(str == null) {
+ return null;
+ }
+ return STRING_INTERNER.intern(str);
+ }
+
+ /**
+ * Return an interned list with identical contents as the given list.
+ * @param list The list whose strings will be interned
+ * @return An identical list with its strings interned.
+ */
+ public static List<String> intern(List<String> list) {
+ if(list == null) {
+ return null;
+ }
+ List<String> newList = new ArrayList<String>(list.size());
+ for(String str : list) {
+ newList.add(intern(str));
+ }
+ return newList;
+ }
+
+ /**
+ * Return an interned map with identical contents as the given map.
+ * @param map The map whose strings will be interned
+ * @return An identical map with its strings interned.
+ */
+ public static Map<String, String> intern(Map<String, String> map) {
+ if(map == null) {
+ return null;
+ }
+ Map<String, String> newMap = new HashMap<String, String>(map.size());
+ for(Map.Entry<String, String> entry : map.entrySet()) {
+ newMap.put(intern(entry.getKey()), intern(entry.getValue()));
+ }
+ return newMap;
}
/**
Modified: hive/branches/spark/common/src/test/org/apache/hadoop/hive/common/type/TestHiveDecimal.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/test/org/apache/hadoop/hive/common/type/TestHiveDecimal.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/common/src/test/org/apache/hadoop/hive/common/type/TestHiveDecimal.java (original)
+++ hive/branches/spark/common/src/test/org/apache/hadoop/hive/common/type/TestHiveDecimal.java Mon Sep 8 04:38:17 2014
@@ -68,6 +68,13 @@ public class TestHiveDecimal {
Assert.assertEquals("0.02", HiveDecimal.enforcePrecisionScale(new BigDecimal("0.015"), 3, 2).toString());
Assert.assertEquals("0.01", HiveDecimal.enforcePrecisionScale(new BigDecimal("0.0145"), 3, 2).toString());
+ // Rounding numbers that increase int digits
+ Assert.assertEquals("10",
+ HiveDecimal.enforcePrecisionScale(new BigDecimal("9.5"), 2, 0).toString());
+ Assert.assertNull(HiveDecimal.enforcePrecisionScale(new BigDecimal("9.5"), 1, 0));
+ Assert.assertEquals("9",
+ HiveDecimal.enforcePrecisionScale(new BigDecimal("9.4"), 1, 0).toString());
+
// Integers with no scale values are not modified (zeros are not null)
Assert.assertEquals("0", HiveDecimal.enforcePrecisionScale(new BigDecimal("0"), 1, 0).toString());
Assert.assertEquals("30", HiveDecimal.enforcePrecisionScale(new BigDecimal("30"), 2, 0).toString());
Propchange: hive/branches/spark/hbase-handler/pom.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Mon Sep 8 04:38:17 2014
@@ -0,0 +1,4 @@
+/hive/branches/branch-0.11/hbase-handler/pom.xml:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184
+/hive/branches/tez/hbase-handler/pom.xml:1494760-1622766
+/hive/branches/vectorization/hbase-handler/pom.xml:1466908-1527856
+/hive/trunk/hbase-handler/pom.xml:1494760-1537575,1608589-1623262
Modified: hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java (original)
+++ hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java Mon Sep 8 04:38:17 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -425,7 +426,7 @@ public class HCatUtil {
try {
Class<? extends HiveStorageHandler> handlerClass =
(Class<? extends HiveStorageHandler>) Class
- .forName(storageHandler, true, JavaUtils.getClassLoader());
+ .forName(storageHandler, true, Utilities.getSessionSpecifiedClassLoader());
return (HiveStorageHandler) ReflectionUtils.newInstance(
handlerClass, conf);
} catch (ClassNotFoundException e) {
Modified: hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/DataType.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/DataType.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/DataType.java (original)
+++ hive/branches/spark/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/DataType.java Mon Sep 8 04:38:17 2014
@@ -224,7 +224,7 @@ public abstract class DataType {
if (o1[i] == o2[i]) {
continue;
}
- if (o1[i] > o1[i]) {
+ if (o1[i] > o2[i]) {
return 1;
} else {
return -1;
Modified: hive/branches/spark/hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestPermsGrp.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestPermsGrp.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestPermsGrp.java (original)
+++ hive/branches/spark/hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/TestPermsGrp.java Mon Sep 8 04:38:17 2014
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
@@ -91,13 +92,11 @@ public class TestPermsGrp extends TestCa
hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://127.0.0.1:" + msPort);
hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES, 3);
- hcatConf.setIntVar(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, 120);
hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName());
hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
- hcatConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.varname, "60");
+ hcatConf.setTimeVar(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, 60, TimeUnit.SECONDS);
hcatConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
clientWH = new Warehouse(hcatConf);
msc = new HiveMetaStoreClient(hcatConf, null);
Modified: hive/branches/spark/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java (original)
+++ hive/branches/spark/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java Mon Sep 8 04:38:17 2014
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
@@ -116,7 +117,7 @@ public class TestHCatPartitionPublish {
+ msPort);
hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES, 3);
- hcatConf.setIntVar(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, 120);
+ hcatConf.setTimeVar(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, 120, TimeUnit.SECONDS);
hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
HCatSemanticAnalyzer.class.getName());
hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
Modified: hive/branches/spark/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java (original)
+++ hive/branches/spark/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java Mon Sep 8 04:38:17 2014
@@ -199,7 +199,8 @@ public class HCatLoader extends HCatBase
throws IOException {
Table table = phutil.getTable(location,
hcatServerUri != null ? hcatServerUri : PigHCatUtil.getHCatServerUri(job),
- PigHCatUtil.getHCatServerPrincipal(job));
+ PigHCatUtil.getHCatServerPrincipal(job),
+ job); // Pass job to initialize metastore conf overrides
List<FieldSchema> tablePartitionKeys = table.getPartitionKeys();
String[] partitionKeys = new String[tablePartitionKeys.size()];
for (int i = 0; i < tablePartitionKeys.size(); i++) {
@@ -215,7 +216,11 @@ public class HCatLoader extends HCatBase
Table table = phutil.getTable(location,
hcatServerUri != null ? hcatServerUri : PigHCatUtil.getHCatServerUri(job),
- PigHCatUtil.getHCatServerPrincipal(job));
+ PigHCatUtil.getHCatServerPrincipal(job),
+
+ // Pass job to initialize metastore conf overrides for embedded metastore case
+ // (hive.metastore.uris = "").
+ job);
HCatSchema hcatTableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
try {
PigHCatUtil.validateHCatTableSchemaFollowsPigRules(hcatTableSchema);
Modified: hive/branches/spark/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java (original)
+++ hive/branches/spark/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java Mon Sep 8 04:38:17 2014
@@ -142,8 +142,16 @@ class PigHCatUtil {
}
private static HiveMetaStoreClient getHiveMetaClient(String serverUri,
- String serverKerberosPrincipal, Class<?> clazz) throws Exception {
- HiveConf hiveConf = new HiveConf(clazz);
+ String serverKerberosPrincipal,
+ Class<?> clazz,
+ Job job) throws Exception {
+
+ // The job configuration is passed in so the configuration will be cloned
+ // from the pig job configuration. This is necessary for overriding
+ // metastore configuration arguments like the metastore jdbc connection string
+ // and password, in the case of an embedded metastore, which you get when
+ // hive.metastore.uris = "".
+ HiveConf hiveConf = new HiveConf(job.getConfiguration(), clazz);
if (serverUri != null) {
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, serverUri.trim());
@@ -178,7 +186,13 @@ class PigHCatUtil {
return new HCatSchema(fcols);
}
- public Table getTable(String location, String hcatServerUri, String hcatServerPrincipal) throws IOException {
+ /*
+ * The job argument is passed so that configuration overrides can be used to initialize
+ * the metastore configuration in the special case of an embedded metastore
+ * (hive.metastore.uris = "").
+ */
+ public Table getTable(String location, String hcatServerUri, String hcatServerPrincipal,
+ Job job) throws IOException {
Pair<String, String> loc_server = new Pair<String, String>(location, hcatServerUri);
Table hcatTable = hcatTableCache.get(loc_server);
if (hcatTable != null) {
@@ -191,7 +205,7 @@ class PigHCatUtil {
Table table = null;
HiveMetaStoreClient client = null;
try {
- client = getHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class);
+ client = getHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class, job);
table = HCatUtil.getTable(client, dbName, tableName);
} catch (NoSuchObjectException nsoe) {
throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend
Modified: hive/branches/spark/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm (original)
+++ hive/branches/spark/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm Mon Sep 8 04:38:17 2014
@@ -1422,7 +1422,7 @@ sub run
$testStatuses->{$testName} = $failedStr;
}
- $msg= "$msg at " . time . "\n";
+ $msg .= "\nEnding test $testName at " . $endTime ."\n";
#print $msg;
print $log $msg;
$duration = $endTime - $beginTime;
@@ -1435,6 +1435,7 @@ sub run
if ($@) {
$msg= "ERROR $subName at : ".__LINE__." Failed to run test $testName <$@>\n";
+ $msg .= "Ending test $testName at " . time ."\n";
#print $msg;
print $log $msg;
$testStatuses->{$testName} = $abortedStr;
Modified: hive/branches/spark/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java (original)
+++ hive/branches/spark/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java Mon Sep 8 04:38:17 2014
@@ -107,7 +107,7 @@ public class HiveEndPoint {
public StreamingConnection newConnection(final boolean createPartIfNotExists)
throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
, ImpersonationFailed , InterruptedException {
- return newConnection(null, createPartIfNotExists, null);
+ return newConnection(createPartIfNotExists, null, null);
}
/**
@@ -126,67 +126,63 @@ public class HiveEndPoint {
public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf)
throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
, ImpersonationFailed , InterruptedException {
- return newConnection(null, createPartIfNotExists, conf);
+ return newConnection(createPartIfNotExists, conf, null);
}
/**
* Acquire a new connection to MetaStore for streaming
- * @param proxyUser User on whose behalf all hdfs and hive operations will be
- * performed on this connection. Set it to null or empty string
- * to connect as user of current process without impersonation.
- * Currently this argument is not supported and must be null
* @param createPartIfNotExists If true, the partition specified in the endpoint
* will be auto created if it does not exist
+ * @param authenticatedUser UserGroupInformation object obtained from successful authentication.
+ * Uses insecure mode if this argument is null.
* @return
- * @throws ConnectionError if problem connecting
+ * @throws ConnectionError if there is a connection problem
* @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
- * @throws ImpersonationFailed if not able to impersonate 'proxyUser'
+ * @throws ImpersonationFailed if not able to impersonate 'username'
* @throws IOException if there was an I/O error when acquiring connection
* @throws PartitionCreationFailed if failed to create partition
* @throws InterruptedException
*/
- private StreamingConnection newConnection(final String proxyUser,
- final boolean createPartIfNotExists, final HiveConf conf)
+ public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
+ final UserGroupInformation authenticatedUser)
throws ConnectionError, InvalidPartition,
InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
- if (proxyUser ==null || proxyUser.trim().isEmpty() ) {
- return newConnectionImpl(System.getProperty("user.name"), null, createPartIfNotExists, conf);
+
+ if( authenticatedUser==null ) {
+ return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf);
}
- final UserGroupInformation ugi = getUserGroupInfo(proxyUser);
+
try {
- return ugi.doAs (
- new PrivilegedExceptionAction<StreamingConnection>() {
+ return authenticatedUser.doAs (
+ new PrivilegedExceptionAction<StreamingConnection>() {
@Override
public StreamingConnection run()
throws ConnectionError, InvalidPartition, InvalidTable
, PartitionCreationFailed {
- return newConnectionImpl(proxyUser, ugi, createPartIfNotExists, conf);
+ return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf);
}
- }
+ }
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed to impersonate '" + proxyUser +
- "' when acquiring connection", e);
+ throw new ConnectionError("Failed to connect as : " + authenticatedUser.getShortUserName(), e);
}
}
-
-
- private StreamingConnection newConnectionImpl(String proxyUser, UserGroupInformation ugi,
+ private StreamingConnection newConnectionImpl(UserGroupInformation ugi,
boolean createPartIfNotExists, HiveConf conf)
throws ConnectionError, InvalidPartition, InvalidTable
, PartitionCreationFailed {
- return new ConnectionImpl(this, proxyUser, ugi, conf, createPartIfNotExists);
+ return new ConnectionImpl(this, ugi, conf, createPartIfNotExists);
}
- private static UserGroupInformation getUserGroupInfo(String proxyUser)
+ private static UserGroupInformation getUserGroupInfo(String user)
throws ImpersonationFailed {
try {
return UserGroupInformation.createProxyUser(
- proxyUser, UserGroupInformation.getLoginUser());
+ user, UserGroupInformation.getLoginUser());
} catch (IOException e) {
- LOG.error("Unable to login as proxy user. Exception follows.", e);
- throw new ImpersonationFailed(proxyUser,e);
+ LOG.error("Unable to get UserGroupInfo for user : " + user, e);
+ throw new ImpersonationFailed(user,e);
}
}
@@ -242,14 +238,12 @@ public class HiveEndPoint {
private static class ConnectionImpl implements StreamingConnection {
private final IMetaStoreClient msClient;
private final HiveEndPoint endPt;
- private final String proxyUser;
private final UserGroupInformation ugi;
+ private final String username;
/**
- *
* @param endPoint end point to connect to
- * @param proxyUser can be null
- * @param ugi of prody user. If ugi is null, impersonation of proxy user will be disabled
+ * @param ugi on behalf of whom streaming is done. cannot be null
* @param conf HiveConf object
* @param createPart create the partition if it does not exist
* @throws ConnectionError if there is trouble connecting
@@ -257,15 +251,15 @@ public class HiveEndPoint {
* @throws InvalidTable if specified table does not exist
* @throws PartitionCreationFailed if createPart=true and not able to create partition
*/
- private ConnectionImpl(HiveEndPoint endPoint, String proxyUser, UserGroupInformation ugi,
+ private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi,
HiveConf conf, boolean createPart)
throws ConnectionError, InvalidPartition, InvalidTable
, PartitionCreationFailed {
- this.proxyUser = proxyUser;
this.endPt = endPoint;
this.ugi = ugi;
+ this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName();
if (conf==null) {
- conf = HiveEndPoint.createHiveConf(this.getClass(),endPoint.metaStoreUri);
+ conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri);
}
this.msClient = getMetaStoreClient(endPoint, conf);
if (createPart && !endPoint.partitionVals.isEmpty()) {
@@ -324,21 +318,21 @@ public class HiveEndPoint {
return ugi.doAs (
new PrivilegedExceptionAction<TransactionBatch>() {
@Override
- public TransactionBatch run() throws StreamingException {
+ public TransactionBatch run() throws StreamingException, InterruptedException {
return fetchTransactionBatchImpl(numTransactions, recordWriter);
}
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
- "' when acquiring Transaction Batch on endPoint " + endPt, e);
+ throw new ImpersonationFailed("Failed to fetch Txn Batch as user '" + ugi.getShortUserName()
+ + "' when acquiring Transaction Batch on endPoint " + endPt, e);
}
}
private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
RecordWriter recordWriter)
- throws StreamingException, TransactionBatchUnAvailable {
- return new TransactionBatchImpl(proxyUser, ugi, endPt, numTransactions, msClient
+ throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
+ return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient
, recordWriter);
}
@@ -349,7 +343,10 @@ public class HiveEndPoint {
if (ep.partitionVals.isEmpty()) {
return;
}
- SessionState state = SessionState.start(new CliSessionState(conf));
+ SessionState localSession = null;
+ if(SessionState.get() == null) {
+ localSession = SessionState.start(new CliSessionState(conf));
+ }
Driver driver = new Driver(conf);
try {
@@ -378,7 +375,9 @@ public class HiveEndPoint {
} finally {
driver.close();
try {
- state.close();
+ if(localSession != null) {
+ localSession.close();
+ }
} catch (IOException e) {
LOG.warn("Error closing SessionState used to run Hive DDL.");
}
@@ -445,7 +444,7 @@ public class HiveEndPoint {
} // class ConnectionImpl
private static class TransactionBatchImpl implements TransactionBatch {
- private final String proxyUser;
+ private final String username;
private final UserGroupInformation ugi;
private final HiveEndPoint endPt;
private final IMetaStoreClient msClient;
@@ -461,7 +460,7 @@ public class HiveEndPoint {
/**
* Represents a batch of transactions acquired from MetaStore
*
- * @param proxyUser
+ * @param user
* @param ugi
* @param endPt
* @param numTxns
@@ -470,9 +469,9 @@ public class HiveEndPoint {
* @throws StreamingException if failed to create new RecordUpdater for batch
* @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
*/
- private TransactionBatchImpl(String proxyUser, UserGroupInformation ugi, HiveEndPoint endPt
- , int numTxns, IMetaStoreClient msClient, RecordWriter recordWriter)
- throws StreamingException, TransactionBatchUnAvailable {
+ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt
+ , final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter)
+ throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
try {
if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) {
Table tableObj = msClient.getTable(endPt.database, endPt.table);
@@ -481,20 +480,38 @@ public class HiveEndPoint {
} else {
partNameForLock = null;
}
- this.proxyUser = proxyUser;
+ this.username = user;
this.ugi = ugi;
this.endPt = endPt;
this.msClient = msClient;
this.recordWriter = recordWriter;
- this.txnIds = msClient.openTxns(proxyUser, numTxns).getTxn_ids();
+
+ txnIds = openTxnImpl(msClient, user, numTxns, ugi);
+
+
this.currentTxnIndex = -1;
this.state = TxnState.INACTIVE;
recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1));
} catch (TException e) {
throw new TransactionBatchUnAvailable(endPt, e);
+ } catch (IOException e) {
+ throw new TransactionBatchUnAvailable(endPt, e);
}
}
+ private List<Long> openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns, UserGroupInformation ugi)
+ throws IOException, TException, InterruptedException {
+ if(ugi==null) {
+ return msClient.openTxns(user, numTxns).getTxn_ids();
+ }
+ return (List<Long>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ return msClient.openTxns(user, numTxns).getTxn_ids();
+ }
+ }) ;
+ }
+
@Override
public String toString() {
if (txnIds==null || txnIds.isEmpty()) {
@@ -526,8 +543,8 @@ public class HiveEndPoint {
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser +
- "' when switch to next Transaction for endPoint :" + endPt, e);
+ throw new ImpersonationFailed("Failed switching to next Txn as user '" + username +
+ "' in Txn batch :" + this, e);
}
}
@@ -536,7 +553,7 @@ public class HiveEndPoint {
throw new InvalidTrasactionState("No more transactions available in" +
" current batch for end point : " + endPt);
++currentTxnIndex;
- lockRequest = createLockRequest(endPt, partNameForLock, proxyUser, getCurrentTxnId());
+ lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId());
try {
LockResponse res = msClient.lock(lockRequest);
if (res.getState() != LockState.ACQUIRED) {
@@ -551,11 +568,14 @@ public class HiveEndPoint {
/**
* Get Id of currently open transaction
- * @return
+ * @return -1 if there is no open TX
*/
@Override
public Long getCurrentTxnId() {
- return txnIds.get(currentTxnIndex);
+ if(currentTxnIndex >= 0) {
+ return txnIds.get(currentTxnIndex);
+ }
+ return -1L;
}
/**
@@ -608,8 +628,8 @@ public class HiveEndPoint {
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
- "' when writing to endPoint :" + endPt + ". Transaction Id: "
+ throw new ImpersonationFailed("Failed wirting as user '" + username +
+ "' to endPoint :" + endPt + ". Transaction Id: "
+ getCurrentTxnId(), e);
}
}
@@ -641,8 +661,8 @@ public class HiveEndPoint {
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser +
- "' when writing to endPoint :" + endPt + ". Transaction Id: "
+ throw new ImpersonationFailed("Failed writing as user '" + username +
+ "' to endPoint :" + endPt + ". Transaction Id: "
+ getCurrentTxnId(), e);
}
}
@@ -680,9 +700,8 @@ public class HiveEndPoint {
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
- "' when committing Txn on endPoint :" + endPt + ". Transaction Id: "
- + getCurrentTxnId(), e);
+ throw new ImpersonationFailed("Failed committing Txn ID " + getCurrentTxnId() + " as user '"
+ + username + "'on endPoint :" + endPt + ". Transaction Id: ", e);
}
}
@@ -726,9 +745,8 @@ public class HiveEndPoint {
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
- "' when aborting Txn on endPoint :" + endPt + ". Transaction Id: "
- + getCurrentTxnId(), e);
+ throw new ImpersonationFailed("Failed aborting Txn " + getCurrentTxnId() + " as user '"
+ + username + "' on endPoint :" + endPt, e);
}
}
@@ -784,8 +802,8 @@ public class HiveEndPoint {
}
);
} catch (IOException e) {
- throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser +
- "' when closing Txn Batch on endPoint :" + endPt, e);
+ throw new ImpersonationFailed("Failed closing Txn Batch as user '" + username +
+ "' on endPoint :" + endPt, e);
}
}
Modified: hive/branches/spark/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java (original)
+++ hive/branches/spark/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java Mon Sep 8 04:38:17 2014
@@ -54,6 +54,8 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
@@ -67,6 +69,7 @@ import java.util.Map;
public class TestStreaming {
+ private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
public static class RawFileSystem extends RawLocalFileSystem {
private static final URI NAME;
@@ -636,18 +639,25 @@ public class TestStreaming {
connection.close();
}
- class WriterThd extends Thread {
+ private static class WriterThd extends Thread {
- private StreamingConnection conn;
- private HiveEndPoint ep;
- private DelimitedInputWriter writer;
- private String data;
+ private final StreamingConnection conn;
+ private final DelimitedInputWriter writer;
+ private final String data;
+ private Throwable error;
WriterThd(HiveEndPoint ep, String data) throws Exception {
- this.ep = ep;
+ super("Writer_" + data);
writer = new DelimitedInputWriter(fieldNames, ",", ep);
conn = ep.newConnection(false);
this.data = data;
+ setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread thread, Throwable throwable) {
+ error = throwable;
+ LOG.error("Thread " + thread.getName() + " died: " + throwable.getMessage(), throwable);
+ }
+ });
}
@Override
@@ -668,14 +678,14 @@ public class TestStreaming {
try {
txnBatch.close();
} catch (Exception e) {
+ LOG.error("txnBatch.close() failed: " + e.getMessage(), e);
conn.close();
- throw new RuntimeException(e);
}
}
try {
conn.close();
} catch (Exception e) {
- throw new RuntimeException(e);
+ LOG.error("conn.close() failed: " + e.getMessage(), e);
}
}
@@ -685,18 +695,23 @@ public class TestStreaming {
@Test
public void testConcurrentTransactionBatchCommits() throws Exception {
final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
- WriterThd t1 = new WriterThd(ep, "1,Matrix");
- WriterThd t2 = new WriterThd(ep, "2,Gandhi");
- WriterThd t3 = new WriterThd(ep, "3,Silence");
-
- t1.start();
- t2.start();
- t3.start();
-
- t1.join();
- t2.join();
- t3.join();
-
+ List<WriterThd> writers = new ArrayList<WriterThd>(3);
+ writers.add(new WriterThd(ep, "1,Matrix"));
+ writers.add(new WriterThd(ep, "2,Gandhi"));
+ writers.add(new WriterThd(ep, "3,Silence"));
+
+ for(WriterThd w : writers) {
+ w.start();
+ }
+ for(WriterThd w : writers) {
+ w.join();
+ }
+ for(WriterThd w : writers) {
+ if(w.error != null) {
+ Assert.assertFalse("Writer thread" + w.getName() + " died: " + w.error.getMessage() +
+ " See log file for stack trace", true);
+ }
+ }
}
// delete db and all tables in it
Modified: hive/branches/spark/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java (original)
+++ hive/branches/spark/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java Mon Sep 8 04:38:17 2014
@@ -22,8 +22,8 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
@@ -49,7 +49,7 @@ public abstract class HCatClient {
HCatClientHMSImpl.class.getName());
try {
Class<? extends HCatClient> clientClass = Class.forName(className,
- true, JavaUtils.getClassLoader()).asSubclass(
+ true, Utilities.getSessionSpecifiedClassLoader()).asSubclass(
HCatClient.class);
client = (HCatClient) clientClass.newInstance();
} catch (ClassNotFoundException e) {