You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2013/05/08 06:43:18 UTC
svn commit: r1480158 [1/2] - in /hive/branches/HIVE-4115: ./ common/
common/src/java/org/apache/hadoop/hive/conf/ conf/ data/files/ hcatalog/
hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/
hcatalog/src/test/e2e/templeton/drivers...
Author: amareshwari
Date: Wed May 8 04:43:16 2013
New Revision: 1480158
URL: http://svn.apache.org/r1480158
Log:
Merging r1476037 through r1480157 into HIVE-4115
Added:
hive/branches/HIVE-4115/data/files/array_table.txt
- copied unchanged from r1480157, hive/trunk/data/files/array_table.txt
hive/branches/HIVE-4115/data/files/map_table.txt
- copied unchanged from r1480157, hive/trunk/data/files/map_table.txt
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
- copied unchanged from r1480157, hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/insert_overwrite_local_directory_1.q
- copied unchanged from r1480157, hive/trunk/ql/src/test/queries/clientpositive/insert_overwrite_local_directory_1.q
hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/ptf_register_tblfn.q
- copied unchanged from r1480157, hive/trunk/ql/src/test/queries/clientpositive/ptf_register_tblfn.q
hive/branches/HIVE-4115/ql/src/test/results/clientpositive/insert_overwrite_local_directory_1.q.out
- copied unchanged from r1480157, hive/trunk/ql/src/test/results/clientpositive/insert_overwrite_local_directory_1.q.out
hive/branches/HIVE-4115/ql/src/test/results/clientpositive/ptf_register_tblfn.q.out
- copied unchanged from r1480157, hive/trunk/ql/src/test/results/clientpositive/ptf_register_tblfn.q.out
Removed:
hive/branches/HIVE-4115/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java.orig
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFFunctionInfo.java
Modified:
hive/branches/HIVE-4115/ (props changed)
hive/branches/HIVE-4115/build.properties
hive/branches/HIVE-4115/build.xml
hive/branches/HIVE-4115/common/build.xml
hive/branches/HIVE-4115/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/branches/HIVE-4115/conf/hive-default.xml.template
hive/branches/HIVE-4115/hcatalog/build.xml
hive/branches/HIVE-4115/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
hive/branches/HIVE-4115/jdbc/src/java/org/apache/hive/jdbc/HiveDatabaseMetaData.java
hive/branches/HIVE-4115/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFContextNGrams.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCovariance.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEWAHBitmap.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFHistogramNumeric.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFVariance.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java
hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java
hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/column_access_stats.q
hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/input4.q
hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/join39.q
hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/join40.q
hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/mapjoin1.q
hive/branches/HIVE-4115/ql/src/test/results/beelinepositive/join39.q.out
hive/branches/HIVE-4115/ql/src/test/results/beelinepositive/join40.q.out
hive/branches/HIVE-4115/ql/src/test/results/beelinepositive/mapjoin1.q.out
hive/branches/HIVE-4115/ql/src/test/results/clientpositive/column_access_stats.q.out
hive/branches/HIVE-4115/ql/src/test/results/clientpositive/input4.q.out
hive/branches/HIVE-4115/ql/src/test/results/clientpositive/show_functions.q.out
hive/branches/HIVE-4115/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
Propchange: hive/branches/HIVE-4115/
------------------------------------------------------------------------------
Merged /hive/trunk:r1476037-1480157
Modified: hive/branches/HIVE-4115/build.properties
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/build.properties?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/build.properties (original)
+++ hive/branches/HIVE-4115/build.properties Wed May 8 04:43:16 2013
@@ -75,7 +75,7 @@ common.jar=${hadoop.root}/lib/commons-ht
# module names needed for build process
iterate.hive.all=ant,shims,common,serde,metastore,ql,contrib,service,cli,jdbc,beeline,hwi,hbase-handler,testutils,hcatalog
iterate.hive.modules=shims,common,serde,metastore,ql,contrib,service,cli,jdbc,beeline,hwi,hbase-handler,testutils,hcatalog
-iterate.hive.tests=ql,contrib,hbase-handler,hwi,jdbc,metastore,odbc,serde,service,hcatalog
+iterate.hive.tests=ql,contrib,hbase-handler,hwi,jdbc,beeline,metastore,odbc,serde,service,hcatalog
iterate.hive.thrift=ql,service,metastore,serde
iterate.hive.protobuf=ql
iterate.hive.cpp=odbc
Modified: hive/branches/HIVE-4115/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/build.xml?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/build.xml (original)
+++ hive/branches/HIVE-4115/build.xml Wed May 8 04:43:16 2013
@@ -506,8 +506,8 @@
<!-- Package the hcat stuff and pull it up into Hive's build dir -->
<ant antfile="${hive.root}/hcatalog/build.xml" target="package"
inheritAll="false"/>
- <mkdir dir="${build.dir.hive}/hcatalog"/>
- <copy todir="${build.dir.hive}/hcatalog">
+ <mkdir dir="${target.dir}/hcatalog"/>
+ <copy todir="${target.dir}/hcatalog">
<fileset dir="${hive.root}/hcatalog/build/hcatalog-${hcatalog.version}"/>
</copy>
</target>
@@ -711,6 +711,7 @@
<tarfileset dir="${hive.root}" mode="664" prefix="${tar.final.name}/src"
excludes="${vcs.excludes}">
<exclude name="build/**" />
+ <exclude name="hcatalog/**/build/**" />
<exclude name="bin/**" />
<exclude name="**/py/**/*-remote" />
<exclude name="data/scripts/**" />
@@ -720,6 +721,7 @@
<tarfileset dir="${hive.root}" mode="755" prefix="${tar.final.name}/src"
excludes="${vcs.excludes}">
<exclude name="build/**" />
+ <exclude name="hcatalog/**/build/**" />
<include name="bin/**" />
<include name="**/py/**/*-remote" />
<include name="data/scripts/**" />
@@ -750,9 +752,6 @@
<exclude name="docs/**"/>
<exclude name="lib/py/**/*-remote"/>
</tarfileset>
- <tarfileset dir="${build.dir.hive}/hcatalog" mode="755" prefix="${bin.final.name}"
- excludes="${vcs.excludes}">
- </tarfileset>
</param.listofitems>
</macro_tar>
</target>
Modified: hive/branches/HIVE-4115/common/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/common/build.xml?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/common/build.xml (original)
+++ hive/branches/HIVE-4115/common/build.xml Wed May 8 04:43:16 2013
@@ -25,6 +25,7 @@ to call at top-level: ant deploy-contrib
<project name="common" default="jar">
<property name="src.dir" location="${basedir}/src/java"/>
+ <property name="src.gen.dir" location="${basedir}/src/gen"/>
<import file="../build-common.xml"/>
<target name="compile" depends="init, setup, ivy-retrieve">
@@ -36,7 +37,7 @@ to call at top-level: ant deploy-contrib
</exec>
<javac
encoding="${build.encoding}"
- srcdir="${src.dir}"
+ srcdir="${src.dir}:${src.gen.dir}"
includes="**/*.java"
destdir="${build.classes}"
debug="${javac.debug}"
Modified: hive/branches/HIVE-4115/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/HIVE-4115/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed May 8 04:43:16 2013
@@ -415,8 +415,6 @@ public class HiveConf extends Configurat
HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000),
HIVEJOINCACHESIZE("hive.join.cache.size", 25000),
HIVEMAPJOINBUCKETCACHESIZE("hive.mapjoin.bucket.cache.size", 100),
- HIVEMAPJOINROWSIZE("hive.mapjoin.size.key", 10000),
- HIVEMAPJOINCACHEROWS("hive.mapjoin.cache.numrows", 25000),
HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000),
HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float) 0.5),
HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY("hive.mapjoin.followby.map.aggr.hash.percentmemory", (float) 0.3),
Modified: hive/branches/HIVE-4115/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/conf/hive-default.xml.template?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/conf/hive-default.xml.template (original)
+++ hive/branches/HIVE-4115/conf/hive-default.xml.template Wed May 8 04:43:16 2013
@@ -595,12 +595,6 @@
</property>
<property>
- <name>hive.mapjoin.cache.numrows</name>
- <value>25000</value>
- <description>How many rows should be cached by jdbm for map join. </description>
-</property>
-
-<property>
<name>hive.optimize.skewjoin</name>
<value>false</value>
<description>Whether to enable skew join optimization.
Modified: hive/branches/HIVE-4115/hcatalog/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/hcatalog/build.xml?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/hcatalog/build.xml (original)
+++ hive/branches/HIVE-4115/hcatalog/build.xml Wed May 8 04:43:16 2013
@@ -91,18 +91,23 @@
<target name="gen-test" description="Generate tests, a no-op for hcat"/>
<target name="test" depends="jar" description="run unit tests">
- <ant target="test" dir="core" inheritAll="false"/>
- <ant target="test" dir="hcatalog-pig-adapter" inheritAll="false"/>
- <ant target="test" dir="server-extensions" inheritAll="false"/>
- <ant target="test" dir="webhcat/svr" inheritAll="false"/>
- <ant target="test" dir="webhcat/java-client" inheritAll="false"/>
- <ant target="test" dir="storage-handlers/hbase" inheritAll="false"/>
- <!-- One checkstyle run for the whole repo. Runs after junit tests
- to piggyback on resolved jars. -->
- <path id="checkstyle.class.path">
- <fileset dir="core/build/lib/test"/>
- </path>
- <antcall target="checkstyle" inheritRefs="true"/>
+ <!-- Placed in a parallel structure so that the tests keep going
+ even if some fail. Otherwise a failure in one of the earlier ant
+ call terminates the target and the rest do not run. -->
+ <parallel threadCount="1">
+ <ant target="test" dir="core" inheritAll="false"/>
+ <ant target="test" dir="hcatalog-pig-adapter" inheritAll="false"/>
+ <ant target="test" dir="server-extensions" inheritAll="false"/>
+ <ant target="test" dir="webhcat/svr" inheritAll="false"/>
+ <ant target="test" dir="webhcat/java-client" inheritAll="false"/>
+ <ant target="test" dir="storage-handlers/hbase" inheritAll="false"/>
+ <!-- One checkstyle run for the whole repo. Runs after junit tests
+ to piggyback on resolved jars. -->
+ <path id="checkstyle.class.path">
+ <fileset dir="core/build/lib/test"/>
+ </path>
+ <antcall target="checkstyle" inheritRefs="true"/>
+ </parallel>
</target>
<target name="compile-test" depends="jar" description="compile unit tests">
Modified: hive/branches/HIVE-4115/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ hive/branches/HIVE-4115/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java Wed May 8 04:43:16 2013
@@ -24,9 +24,9 @@ import java.util.HashMap;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
@@ -76,17 +76,42 @@ import org.slf4j.LoggerFactory;
public class NotificationListener extends MetaStoreEventListener {
private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
- protected Session session;
protected Connection conn;
private static MessageFactory messageFactory = MessageFactory.getInstance();
+ public static final int NUM_RETRIES = 1;
+ private static final String HEALTH_CHECK_TOPIC_SUFFIX = "jms_health_check";
+ private static final String HEALTH_CHECK_MSG = "HCAT_JMS_HEALTH_CHECK_MESSAGE";
+
+ protected final ThreadLocal<Session> session = new ThreadLocal<Session>() {
+ @Override
+ protected Session initialValue() {
+ try {
+ return createSession();
+ } catch (Exception e) {
+ LOG.error("Couldn't create JMS Session", e);
+ return null;
+ }
+ }
+
+ @Override
+ public void remove() {
+ if (get() != null) {
+ try {
+ get().close();
+ } catch (Exception e) {
+ LOG.error("Unable to close bad JMS session, ignored error", e);
+ }
+ }
+ super.remove();
+ }
+ };
/**
* Create message bus connection and session in constructor.
*/
public NotificationListener(final Configuration conf) {
-
super(conf);
- createConnection();
+ testAndCreateConnection();
}
private static String getTopicName(Partition partition,
@@ -178,7 +203,7 @@ public class NotificationListener extend
// Subscriber can get notification about drop of a database in HCAT
// by listening on a topic named "HCAT" and message selector string
// as "HCAT_EVENT = HCAT_DROP_DATABASE"
- if (dbEvent.getStatus()) {
+ if (dbEvent.getStatus()) {
String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName);
}
@@ -216,7 +241,7 @@ public class NotificationListener extend
}
}
- private String getTopicPrefix(HiveConf conf) {
+ private String getTopicPrefix(Configuration conf) {
return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
}
@@ -253,86 +278,102 @@ public class NotificationListener extend
* @param topicName is the name on message broker on which message is sent.
*/
protected void send(HCatEventMessage hCatEventMessage, String topicName) {
- try {
- if(null == session){
- // this will happen, if we never able to establish a connection.
- createConnection();
- if (null == session){
- // Still not successful, return from here.
- LOG.error("Invalid session. Failed to send message on topic: " +
- topicName + " event: " + hCatEventMessage.getEventType());
- return;
- }
- }
-
- Destination topic = getTopic(topicName);
+ send(hCatEventMessage, topicName, NUM_RETRIES);
+ }
- if (null == topic){
- // Still not successful, return from here.
- LOG.error("Invalid session. Failed to send message on topic: " +
- topicName + " event: " + hCatEventMessage.getEventType());
- return;
+ /**
+ * @param hCatEventMessage The HCatEventMessage being sent over JMS, this method is threadsafe
+ * @param topicName is the name on message broker on which message is sent.
+ * @param retries the number of retry attempts
+ */
+ protected void send(HCatEventMessage hCatEventMessage, String topicName, int retries) {
+ try {
+ if (session.get() == null) {
+ // Need to reconnect
+ throw new JMSException("Invalid JMS session");
}
-
- MessageProducer producer = session.createProducer(topic);
- Message msg = session.createTextMessage(hCatEventMessage.toString());
+ Destination topic = createTopic(topicName);
+ Message msg = session.get().createTextMessage(hCatEventMessage.toString());
msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString());
msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion());
msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat());
+ MessageProducer producer = createProducer(topic);
producer.send(msg);
// Message must be transacted before we return.
- session.commit();
- }
- catch(Exception e){
- // Gobble up the exception. Message delivery is best effort.
- LOG.error("Failed to send message on topic: " + topicName +
- " event: " + hCatEventMessage.getEventType(), e);
+ session.get().commit();
+ } catch (Exception e) {
+ if (retries >= 0) {
+ // this may happen if we were able to establish connection once, but its no longer valid
+ LOG.error("Seems like connection is lost. Will retry. Retries left : " + retries + ". error was:", e);
+ testAndCreateConnection();
+ send(hCatEventMessage, topicName, retries - 1);
+ } else {
+ // Gobble up the exception. Message delivery is best effort.
+ LOG.error("Failed to send message on topic: " + topicName +
+ " event: " + hCatEventMessage.getEventType() + " after retries: " + NUM_RETRIES, e);
+ }
}
}
/**
- * Get the topic object for the topicName, it also tries to reconnect
- * if the connection appears to be broken.
+ * Get the topic object for the topicName
*
* @param topicName The String identifying the message-topic.
* @return A {@link Topic} object corresponding to the specified topicName.
* @throws JMSException
*/
- protected Topic getTopic(final String topicName) throws JMSException {
- Topic topic;
+ protected Topic createTopic(final String topicName) throws JMSException {
+ return session.get().createTopic(topicName);
+ }
+
+ /**
+ * Does a health check on the connection by sending a dummy message.
+ * Create the connection if the connection is found to be bad
+ * Also recreates the session
+ */
+ protected synchronized void testAndCreateConnection() {
+ if (conn != null) {
+ // This method is reached when error occurs while sending msg, so the session must be bad
+ session.remove();
+ if (!isConnectionHealthy()) {
+ // I am the first thread to detect the error, cleanup old connection & reconnect
+ try {
+ conn.close();
+ } catch (Exception e) {
+ LOG.error("Unable to close bad JMS connection, ignored error", e);
+ }
+ conn = createConnection();
+ }
+ } else {
+ conn = createConnection();
+ }
try {
- // Topics are created on demand. If it doesn't exist on broker it will
- // be created when broker receives this message.
- topic = session.createTopic(topicName);
- } catch (IllegalStateException ise) {
- // this will happen if we were able to establish connection once, but its no longer valid,
- // ise is thrown, catch it and retry.
- LOG.error("Seems like connection is lost. Retrying", ise);
- createConnection();
- topic = session.createTopic(topicName);
+ session.set(createSession());
+ } catch (JMSException e) {
+ LOG.error("Couldn't create JMS session, ignored the error", e);
}
- return topic;
}
- protected void createConnection() {
-
+ /**
+ * Create the JMS connection
+ * @return newly created JMS connection
+ */
+ protected Connection createConnection() {
+ LOG.info("Will create new JMS connection");
Context jndiCntxt;
+ Connection jmsConnection = null;
try {
jndiCntxt = new InitialContext();
- ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
- .lookup("ConnectionFactory");
- Connection conn = connFac.createConnection();
- conn.start();
- conn.setExceptionListener(new ExceptionListener() {
+ ConnectionFactory connFac = (ConnectionFactory) jndiCntxt.lookup("ConnectionFactory");
+ jmsConnection = connFac.createConnection();
+ jmsConnection.start();
+ jmsConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException jmse) {
- LOG.error(jmse.toString());
+ LOG.error("JMS Exception listener received exception. Ignored the error", jmse);
}
});
- // We want message to be sent when session commits, thus we run in
- // transacted mode.
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
} catch (NamingException e) {
LOG.error("JNDI error while setting up Message Bus connection. "
+ "Please make sure file named 'jndi.properties' is in "
@@ -342,20 +383,54 @@ public class NotificationListener extend
} catch (Throwable t) {
LOG.error("Unable to connect to JMS provider", t);
}
+ return jmsConnection;
+ }
+
+ /**
+ * Send a dummy message to probe if the JMS connection is healthy
+ * @return true if connection is healthy, false otherwise
+ */
+ protected boolean isConnectionHealthy() {
+ try {
+ Topic topic = createTopic(getTopicPrefix(getConf()) + "." + HEALTH_CHECK_TOPIC_SUFFIX);
+ MessageProducer producer = createProducer(topic);
+ Message msg = session.get().createTextMessage(HEALTH_CHECK_MSG);
+ producer.send(msg, DeliveryMode.NON_PERSISTENT, 4, 0);
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Creates a JMS session
+ * @return newly create JMS session
+ * @throws JMSException
+ */
+ protected Session createSession() throws JMSException {
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ return conn.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ /**
+ * Create a JMS producer
+ * @param topic
+ * @return newly created message producer
+ * @throws JMSException
+ */
+ protected MessageProducer createProducer(Destination topic) throws JMSException {
+ return session.get().createProducer(topic);
}
@Override
protected void finalize() throws Throwable {
- // Close the connection before dying.
- try {
- if (null != session)
- session.close();
- if (conn != null) {
+ if (conn != null) {
+ try {
conn.close();
+ } catch (Exception e) {
+ LOG.error("Couldn't close jms connection, ignored the error", e);
}
-
- } catch (Exception ignore) {
- LOG.info("Failed to close message bus connection.", ignore);
}
}
Modified: hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm (original)
+++ hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm Wed May 8 04:43:16 2013
@@ -699,7 +699,9 @@ sub compare
}
- if (defined $testCmd->{'check_job_created'} || defined $testCmd->{'check_job_complete'}) {
+ if ( (defined $testCmd->{'check_job_created'})
+ || (defined $testCmd->{'check_job_complete'})
+ || (defined $testCmd->{'check_job_exit_value'}) ) {
my $jobid = $json_hash->{'id'};
if (!defined $jobid) {
print $log "$0::$subName INFO check failed: "
@@ -714,7 +716,7 @@ sub compare
. "jobresult not defined ";
$result = 0;
}
- if ($testCmd->{'check_job_complete'}) {
+ if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'})) {
my $jobComplete;
my $NUM_RETRIES = 60;
my $SLEEP_BETWEEN_RETRIES = 5;
@@ -736,11 +738,21 @@ sub compare
$result = 0;
} else {
# job has completed, check the runState value
- my $runState = $res_hash->{'status'}->{'runState'};
- my $runStateVal = $self->getRunStateNum($testCmd->{'check_job_complete'});
- if ( (!defined $runState) || $runState ne $runStateVal) {
- print $log "check_job_complete failed. got runState $runState, expected $runStateVal";
- $result = 0;
+ if (defined($testCmd->{'check_job_complete'})) {
+ my $runState = $res_hash->{'status'}->{'runState'};
+ my $runStateVal = $self->getRunStateNum($testCmd->{'check_job_complete'});
+ if ( (!defined $runState) || $runState ne $runStateVal) {
+ print $log "check_job_complete failed. got runState $runState, expected $runStateVal";
+ $result = 0;
+ }
+ }
+ if (defined($testCmd->{'check_job_exit_value'})) {
+ my $exitValue = $res_hash->{'exitValue'};
+ my $expectedExitValue = $testCmd->{'check_job_exit_value'};
+ if ( (!defined $exitValue) || $exitValue ne $expectedExitValue) {
+ print $log "check_job_exit_value failed. got exitValue $exitValue, expected $expectedExitValue";
+ $result = 0;
+ }
}
}
}
Modified: hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf (original)
+++ hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf Wed May 8 04:43:16 2013
@@ -49,6 +49,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
{
@@ -107,6 +108,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
]
@@ -128,6 +130,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'FAILURE',
+ 'check_job_exit_value' => 8,
'check_call_back' => 1,
},
{
@@ -140,7 +143,9 @@ $cfg =
#results
'status_code' => 200,
'check_job_created' => 1,
+ 'check_job_exit_value' => 0,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
{
@@ -154,6 +159,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
{
@@ -167,6 +173,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -181,6 +188,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -197,6 +205,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -212,6 +221,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -257,6 +267,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'FAILURE',
+ 'check_job_exit_value' => 11,
},
@@ -271,6 +282,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -285,6 +297,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
{
@@ -298,6 +311,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -312,6 +326,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -326,6 +341,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -340,6 +356,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -356,6 +373,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
},
Modified: hive/branches/HIVE-4115/jdbc/src/java/org/apache/hive/jdbc/HiveDatabaseMetaData.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/jdbc/src/java/org/apache/hive/jdbc/HiveDatabaseMetaData.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/jdbc/src/java/org/apache/hive/jdbc/HiveDatabaseMetaData.java (original)
+++ hive/branches/HIVE-4115/jdbc/src/java/org/apache/hive/jdbc/HiveDatabaseMetaData.java Wed May 8 04:43:16 2013
@@ -28,12 +28,16 @@ import java.util.Comparator;
import java.util.jar.Attributes;
import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.thrift.TCLIService;
import org.apache.hive.service.cli.thrift.TGetCatalogsReq;
import org.apache.hive.service.cli.thrift.TGetCatalogsResp;
import org.apache.hive.service.cli.thrift.TGetColumnsReq;
import org.apache.hive.service.cli.thrift.TGetColumnsResp;
import org.apache.hive.service.cli.thrift.TGetFunctionsReq;
import org.apache.hive.service.cli.thrift.TGetFunctionsResp;
+import org.apache.hive.service.cli.thrift.TGetInfoReq;
+import org.apache.hive.service.cli.thrift.TGetInfoResp;
import org.apache.hive.service.cli.thrift.TGetSchemasReq;
import org.apache.hive.service.cli.thrift.TGetSchemasResp;
import org.apache.hive.service.cli.thrift.TGetTableTypesReq;
@@ -42,7 +46,6 @@ import org.apache.hive.service.cli.thrif
import org.apache.hive.service.cli.thrift.TGetTablesResp;
import org.apache.hive.service.cli.thrift.TGetTypeInfoReq;
import org.apache.hive.service.cli.thrift.TGetTypeInfoResp;
-import org.apache.hive.service.cli.thrift.TCLIService;
import org.apache.hive.service.cli.thrift.TSessionHandle;
import org.apache.thrift.TException;
@@ -249,8 +252,17 @@ public class HiveDatabaseMetaData implem
}
public String getDatabaseProductVersion() throws SQLException {
- // TODO: Fetch this from the server side
- return "0.10.0";
+
+ TGetInfoReq req = new TGetInfoReq(sessHandle, GetInfoType.CLI_DBMS_VER.toTGetInfoType());
+ TGetInfoResp resp;
+ try {
+ resp = client.GetInfo(req);
+ } catch (TException e) {
+ throw new SQLException(e.getMessage(), "08S01", e);
+ }
+ Utils.verifySuccess(resp.getStatus());
+
+ return resp.getInfoValue().getStringValue();
}
public int getDefaultTransactionIsolation() throws SQLException {
Modified: hive/branches/HIVE-4115/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java (original)
+++ hive/branches/HIVE-4115/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java Wed May 8 04:43:16 2013
@@ -35,11 +35,13 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
import junit.framework.TestCase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.common.util.HiveVersionInfo;
/**
* TestJdbcDriver2
@@ -833,7 +835,11 @@ public class TestJdbcDriver2 extends Tes
DatabaseMetaData meta = con.getMetaData();
assertEquals("Hive", meta.getDatabaseProductName());
- assertEquals("0.10.0", meta.getDatabaseProductVersion());
+ assertEquals(HiveVersionInfo.getVersion(), meta.getDatabaseProductVersion());
+ assertEquals(System.getProperty("hive.version"), meta.getDatabaseProductVersion());
+ assertTrue("verifying hive version pattern. got " + meta.getDatabaseProductVersion(),
+ Pattern.matches("\\d+\\.\\d+\\.\\d+.*", meta.getDatabaseProductVersion()) );
+
assertEquals(DatabaseMetaData.sqlStateSQL99, meta.getSQLStateType());
assertFalse(meta.supportsCatalogsInTableDefinitions());
assertFalse(meta.supportsSchemasInTableDefinitions());
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed May 8 04:43:16 2013
@@ -1060,6 +1060,10 @@ public class Driver implements CommandPr
conf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr);
+
+ conf.set("mapreduce.workflow.id", "hive_"+queryId);
+ conf.set("mapreduce.workflow.name", queryStr);
+
maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
try {
@@ -1338,6 +1342,8 @@ public class Driver implements CommandPr
if (noName) {
conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + tsk.getId() + ")");
}
+ conf.set("mapreduce.workflow.node.name", tsk.getId());
+ Utilities.setWorkflowAdjacencies(conf, plan);
cxt.incCurJobNo(1);
console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Wed May 8 04:43:16 2013
@@ -53,7 +53,6 @@ public abstract class AbstractMapJoinOpe
protected transient List<ObjectInspector>[] joinKeysStandardObjectInspectors;
protected transient byte posBigTable = -1; // one of the tables that is not in memory
- transient int mapJoinRowsKey; // rows for a given key
protected transient RowContainer<ArrayList<Object>> emptyList = null;
@@ -104,9 +103,6 @@ public abstract class AbstractMapJoinOpe
!hasFilter(posBigTable), reporter);
storage[posBigTable] = bigPosRC;
- mapJoinRowsKey = HiveConf.getIntVar(hconf,
- HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
-
List<? extends StructField> structFields = ((StructObjectInspector) outputObjInspector)
.getAllStructFieldRefs();
if (conf.getOutputColumnNames().size() < structFields.size()) {
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java Wed May 8 04:43:16 2013
@@ -23,6 +23,8 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver;
+import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction;
/**
* FunctionInfo.
@@ -32,6 +34,8 @@ public class FunctionInfo {
private final boolean isNative;
+ private final boolean isInternalTableFunction;
+
private final String displayName;
private GenericUDF genericUDF;
@@ -40,11 +44,14 @@ public class FunctionInfo {
private GenericUDAFResolver genericUDAFResolver;
+ private Class<? extends TableFunctionResolver> tableFunctionResolver;
+
public FunctionInfo(boolean isNative, String displayName,
GenericUDF genericUDF) {
this.isNative = isNative;
this.displayName = displayName;
this.genericUDF = genericUDF;
+ this.isInternalTableFunction = false;
}
public FunctionInfo(boolean isNative, String displayName,
@@ -52,6 +59,7 @@ public class FunctionInfo {
this.isNative = isNative;
this.displayName = displayName;
this.genericUDAFResolver = genericUDAFResolver;
+ this.isInternalTableFunction = false;
}
public FunctionInfo(boolean isNative, String displayName,
@@ -59,6 +67,16 @@ public class FunctionInfo {
this.isNative = isNative;
this.displayName = displayName;
this.genericUDTF = genericUDTF;
+ this.isInternalTableFunction = false;
+ }
+
+ public FunctionInfo(String displayName, Class<? extends TableFunctionResolver> tFnCls)
+ {
+ this.displayName = displayName;
+ this.tableFunctionResolver = tFnCls;
+ PartitionTableFunctionDescription def = tableFunctionResolver.getAnnotation(PartitionTableFunctionDescription.class);
+ this.isNative = (def == null) ? false : def.isInternal();
+ this.isInternalTableFunction = isNative;
}
/**
@@ -90,6 +108,8 @@ public class FunctionInfo {
return genericUDAFResolver;
}
+
+
/**
* Get the Class of the UDF.
*/
@@ -109,6 +129,9 @@ public class FunctionInfo {
} else if (isGenericUDTF()) {
return genericUDTF.getClass();
}
+ if(isTableFunction()) {
+ return this.tableFunctionResolver;
+ }
return null;
}
@@ -131,6 +154,14 @@ public class FunctionInfo {
}
/**
+ * Internal table functions cannot be used in the language.
+ * {@link WindowingTableFunction}
+ */
+ public boolean isInternalTableFunction() {
+ return isInternalTableFunction;
+ }
+
+ /**
* @return TRUE if the function is a GenericUDF
*/
public boolean isGenericUDF() {
@@ -150,4 +181,11 @@ public class FunctionInfo {
public boolean isGenericUDTF() {
return null != genericUDTF;
}
+
+ /**
+ * @return TRUE if the function is a Table Function
+ */
+ public boolean isTableFunction() {
+ return null != tableFunctionResolver;
+ }
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Wed May 8 04:43:16 2013
@@ -182,7 +182,6 @@ public final class FunctionRegistry {
public static final String NOOP_TABLE_FUNCTION = "noop";
public static final String NOOP_MAP_TABLE_FUNCTION = "noopwithmap";
- static Map<String, PTFFunctionInfo> tableFunctions = Collections.synchronizedMap(new LinkedHashMap<String, PTFFunctionInfo>());
static Map<String, WindowFunctionInfo> windowFunctions = Collections.synchronizedMap(new LinkedHashMap<String, WindowFunctionInfo>());
/*
@@ -1294,6 +1293,9 @@ public final class FunctionRegistry {
FunctionRegistry.registerTemporaryGenericUDAF(
functionName, (GenericUDAFResolver)
ReflectionUtils.newInstance(udfClass, null));
+ } else if(TableFunctionResolver.class.isAssignableFrom(udfClass)) {
+ FunctionRegistry.registerTableFunction(
+ functionName, (Class<? extends TableFunctionResolver>)udfClass);
} else {
return false;
}
@@ -1406,14 +1408,17 @@ public final class FunctionRegistry {
public static boolean isTableFunction(String name)
{
- PTFFunctionInfo tFInfo = tableFunctions.get(name.toLowerCase());
- return tFInfo != null && !tFInfo.isInternal();
+ FunctionInfo tFInfo = mFunctions.get(name.toLowerCase());
+ return tFInfo != null && !tFInfo.isInternalTableFunction() && tFInfo.isTableFunction();
}
public static TableFunctionResolver getTableFunctionResolver(String name)
{
- PTFFunctionInfo tfInfo = tableFunctions.get(name.toLowerCase());
- return (TableFunctionResolver) ReflectionUtils.newInstance(tfInfo.getFunctionResolver(), null);
+ FunctionInfo tfInfo = mFunctions.get(name.toLowerCase());
+ if(tfInfo.isTableFunction()) {
+ return (TableFunctionResolver) ReflectionUtils.newInstance(tfInfo.getFunctionClass(), null);
+ }
+ return null;
}
public static TableFunctionResolver getWindowingTableFunction()
@@ -1428,8 +1433,8 @@ public final class FunctionRegistry {
public static void registerTableFunction(String name, Class<? extends TableFunctionResolver> tFnCls)
{
- PTFFunctionInfo tInfo = new PTFFunctionInfo(name, tFnCls);
- tableFunctions.put(name.toLowerCase(), tInfo);
+ FunctionInfo tInfo = new FunctionInfo(name, tFnCls);
+ mFunctions.put(name.toLowerCase(), tInfo);
}
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Wed May 8 04:43:16 2013
@@ -151,35 +151,13 @@ public class GroupByOperator extends Ope
private List<FastBitSet> groupingSetsBitSet;
transient private List<Object> newKeysGroupingSets;
- /**
- * This is used to store the position and field names for variable length
- * fields.
- **/
- class varLenFields {
- int aggrPos;
- List<Field> fields;
-
- varLenFields(int aggrPos, List<Field> fields) {
- this.aggrPos = aggrPos;
- this.fields = fields;
- }
-
- int getAggrPos() {
- return aggrPos;
- }
-
- List<Field> getFields() {
- return fields;
- }
- };
-
// for these positions, some variable primitive type (String) is used, so size
// cannot be estimated. sample it at runtime.
transient List<Integer> keyPositionsSize;
// for these positions, some variable primitive type (String) is used for the
// aggregation classes
- transient List<varLenFields> aggrPositions;
+ transient List<Field>[] aggrPositions;
transient int fixedRowSize;
transient long maxHashTblMemory;
@@ -383,7 +361,7 @@ public class GroupByOperator extends Ope
aggregations = newAggregations();
hashAggr = true;
keyPositionsSize = new ArrayList<Integer>();
- aggrPositions = new ArrayList<varLenFields>();
+ aggrPositions = new List[aggregations.length];
groupbyMapAggrInterval = HiveConf.getIntVar(hconf,
HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL);
@@ -523,21 +501,10 @@ public class GroupByOperator extends Ope
}
if (c.isInstance(new String()) || c.isInstance(new ByteArrayRef())) {
- int idx = 0;
- varLenFields v = null;
- for (idx = 0; idx < aggrPositions.size(); idx++) {
- v = aggrPositions.get(idx);
- if (v.getAggrPos() == pos) {
- break;
- }
+ if (aggrPositions[pos] == null) {
+ aggrPositions[pos] = new ArrayList<Field>();
}
-
- if (idx == aggrPositions.size()) {
- v = new varLenFields(pos, new ArrayList<Field>());
- aggrPositions.add(v);
- }
-
- v.getFields().add(f);
+ aggrPositions[pos].add(f);
return javaObjectOverHead;
}
@@ -582,9 +549,11 @@ public class GroupByOperator extends Ope
for (int i = 0; i < aggregationEvaluators.length; i++) {
fixedRowSize += javaObjectOverHead;
- Class<? extends AggregationBuffer> agg = aggregationEvaluators[i]
- .getNewAggregationBuffer().getClass();
- Field[] fArr = ObjectInspectorUtils.getDeclaredNonStaticFields(agg);
+ AggregationBuffer agg = aggregationEvaluators[i].getNewAggregationBuffer();
+ if (GenericUDAFEvaluator.isEstimable(agg)) {
+ continue;
+ }
+ Field[] fArr = ObjectInspectorUtils.getDeclaredNonStaticFields(agg.getClass());
for (Field f : fArr) {
fixedRowSize += getSize(i, f.getType(), f);
}
@@ -968,29 +937,15 @@ public class GroupByOperator extends Ope
}
}
- AggregationBuffer[] aggs = null;
- if (aggrPositions.size() > 0) {
- KeyWrapper newKeyProber = newKeys.copyKey();
- aggs = hashAggregations.get(newKeyProber);
- }
-
- for (varLenFields v : aggrPositions) {
- int aggrPos = v.getAggrPos();
- List<Field> fieldsVarLen = v.getFields();
- AggregationBuffer agg = aggs[aggrPos];
-
- try {
- for (Field f : fieldsVarLen) {
- Object o = f.get(agg);
- if (o instanceof String){
- totalVariableSize += ((String)o).length();
- }
- else if (o instanceof ByteArrayRef){
- totalVariableSize += ((ByteArrayRef)o).getData().length;
- }
- }
- } catch (IllegalAccessException e) {
- assert false;
+ AggregationBuffer[] aggs = hashAggregations.get(newKeys);
+ for (int i = 0; i < aggs.length; i++) {
+ AggregationBuffer agg = aggs[i];
+ if (GenericUDAFEvaluator.isEstimable(agg)) {
+ totalVariableSize += ((GenericUDAFEvaluator.AbstractAggregationBuffer)agg).estimate();
+ continue;
+ }
+ if (aggrPositions[i] != null) {
+ totalVariableSize += estimateSize(agg, aggrPositions[i]);
}
}
@@ -1010,6 +965,24 @@ public class GroupByOperator extends Ope
return false;
}
+ private int estimateSize(AggregationBuffer agg, List<Field> fields) {
+ int length = 0;
+ for (Field f : fields) {
+ try {
+ Object o = f.get(agg);
+ if (o instanceof String){
+ length += ((String)o).length();
+ }
+ else if (o instanceof ByteArrayRef){
+ length += ((ByteArrayRef)o).getData().length;
+ }
+ } catch (Exception e) {
+ // continue.. null out the field?
+ }
+ }
+ return length;
+ }
+
private void flush(boolean complete) throws HiveException {
countAfterReport = 0;
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Wed May 8 04:43:16 2013
@@ -70,7 +70,6 @@ public class HashTableSinkOperator exten
protected transient List<ObjectInspector>[] joinKeysStandardObjectInspectors;
protected transient int posBigTableAlias = -1; // one of the tables that is not in memory
- transient int mapJoinRowsKey; // rows for a given key
protected transient RowContainer<ArrayList<Object>> emptyList = null;
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed May 8 04:43:16 2013
@@ -121,6 +121,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
+import org.apache.hadoop.hive.ql.plan.api.Adjacency;
+import org.apache.hadoop.hive.ql.plan.api.Graph;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
@@ -229,6 +231,25 @@ public final class Utilities {
}
}
+ public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
+ try {
+ Graph stageGraph = plan.getQueryPlan().getStageGraph();
+ if (stageGraph == null)
+ return;
+ List<Adjacency> adjList = stageGraph.getAdjacencyList();
+ if (adjList == null)
+ return;
+ for (Adjacency adj : adjList) {
+ List<String> children = adj.getChildren();
+ if (children == null || children.isEmpty())
+ return;
+ conf.setStrings("mapreduce.workflow.adjacency."+adj.getNode(),
+ children.toArray(new String[children.size()]));
+ }
+ } catch (IOException e) {
+ }
+ }
+
public static List<String> getFieldSchemaString(List<FieldSchema> fl) {
if (fl == null) {
return null;
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Wed May 8 04:43:16 2013
@@ -140,7 +140,200 @@ import org.apache.hadoop.util.Reflection
* </ul>
* </li>
* </ul>
+ * <p>
+ * <pre>
+ * {@code
+ * The following is a pseudo-BNF grammar for RCFile. Comments are prefixed
+ * with dashes:
*
+ * rcfile ::=
+ * <file-header>
+ * <rcfile-rowgroup>+
+ *
+ * file-header ::=
+ * <file-version-header>
+ * <file-key-class-name> (only exists if version is seq6)
+ * <file-value-class-name> (only exists if version is seq6)
+ * <file-is-compressed>
+ * <file-is-block-compressed> (only exists if version is seq6)
+ * [<file-compression-codec-class>]
+ * <file-header-metadata>
+ * <file-sync-field>
+ *
+ * -- The normative RCFile implementation included with Hive is actually
+ * -- based on a modified version of Hadoop's SequenceFile code. Some
+ * -- things which should have been modified were not, including the code
+ * -- that writes out the file version header. Consequently, RCFile and
+ * -- SequenceFile originally shared the same version header. A newer
+ * -- release has created a unique version string.
+ *
+ * file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
+ * | Byte[4] {'R', 'C', 'F', 1}
+ *
+ * -- The name of the Java class responsible for reading the key buffer
+ * -- component of the rowgroup.
+ *
+ * file-key-class-name ::=
+ * Text {"org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"}
+ *
+ * -- The name of the Java class responsible for reading the value buffer
+ * -- component of the rowgroup.
+ *
+ * file-value-class-name ::=
+ * Text {"org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer"}
+ *
+ * -- Boolean variable indicating whether or not the file uses compression
+ * -- for the key and column buffer sections.
+ *
+ * file-is-compressed ::= Byte[1]
+ *
+ * -- A boolean field indicating whether or not the file is block compressed.
+ * -- This field is *always* false. According to comments in the original
+ * -- RCFile implementation this field was retained for backwards
+ * -- compatability with the SequenceFile format.
+ *
+ * file-is-block-compressed ::= Byte[1] {false}
+ *
+ * -- The Java class name of the compression codec iff <file-is-compressed>
+ * -- is true. The named class must implement
+ * -- org.apache.hadoop.io.compress.CompressionCodec.
+ * -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
+ *
+ * file-compression-codec-class ::= Text
+ *
+ * -- A collection of key-value pairs defining metadata values for the
+ * -- file. The Map is serialized using standard JDK serialization, i.e.
+ * -- an Int corresponding to the number of key-value pairs, followed by
+ * -- Text key and value pairs. The following metadata properties are
+ * -- mandatory for all RCFiles:
+ * --
+ * -- hive.io.rcfile.column.number: the number of columns in the RCFile
+ *
+ * file-header-metadata ::= Map<Text, Text>
+ *
+ * -- A 16 byte marker that is generated by the writer. This marker appears
+ * -- at regular intervals at the beginning of rowgroup-headers, and is
+ * -- intended to enable readers to skip over corrupted rowgroups.
+ *
+ * file-sync-hash ::= Byte[16]
+ *
+ * -- Each row group is split into three sections: a header, a set of
+ * -- key buffers, and a set of column buffers. The header section includes
+ * -- an optional sync hash, information about the size of the row group, and
+ * -- the total number of rows in the row group. Each key buffer
+ * -- consists of run-length encoding data which is used to decode
+ * -- the length and offsets of individual fields in the corresponding column
+ * -- buffer.
+ *
+ * rcfile-rowgroup ::=
+ * <rowgroup-header>
+ * <rowgroup-key-data>
+ * <rowgroup-column-buffers>
+ *
+ * rowgroup-header ::=
+ * [<rowgroup-sync-marker>, <rowgroup-sync-hash>]
+ * <rowgroup-record-length>
+ * <rowgroup-key-length>
+ * <rowgroup-compressed-key-length>
+ *
+ * -- rowgroup-key-data is compressed if the column data is compressed.
+ * rowgroup-key-data ::=
+ * <rowgroup-num-rows>
+ * <rowgroup-key-buffers>
+ *
+ * -- An integer (always -1) signaling the beginning of a sync-hash
+ * -- field.
+ *
+ * rowgroup-sync-marker ::= Int
+ *
+ * -- A 16 byte sync field. This must match the <file-sync-hash> value read
+ * -- in the file header.
+ *
+ * rowgroup-sync-hash ::= Byte[16]
+ *
+ * -- The record-length is the sum of the number of bytes used to store
+ * -- the key and column parts, i.e. it is the total length of the current
+ * -- rowgroup.
+ *
+ * rowgroup-record-length ::= Int
+ *
+ * -- Total length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-key-length ::= Int
+ *
+ * -- Total compressed length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-compressed-key-length ::= Int
+ *
+ * -- Number of rows in the current rowgroup.
+ *
+ * rowgroup-num-rows ::= VInt
+ *
+ * -- One or more column key buffers corresponding to each column
+ * -- in the RCFile.
+ *
+ * rowgroup-key-buffers ::= <rowgroup-key-buffer>+
+ *
+ * -- Data in each column buffer is stored using a run-length
+ * -- encoding scheme that is intended to reduce the cost of
+ * -- repeated column field values. This mechanism is described
+ * -- in more detail in the following entries.
+ *
+ * rowgroup-key-buffer ::=
+ * <column-buffer-length>
+ * <column-buffer-uncompressed-length>
+ * <column-key-buffer-length>
+ * <column-key-buffer>
+ *
+ * -- The serialized length on disk of the corresponding column buffer.
+ *
+ * column-buffer-length ::= VInt
+ *
+ * -- The uncompressed length of the corresponding column buffer. This
+ * -- is equivalent to column-buffer-length if the RCFile is not compressed.
+ *
+ * column-buffer-uncompressed-length ::= VInt
+ *
+ * -- The length in bytes of the current column key buffer
+ *
+ * column-key-buffer-length ::= VInt
+ *
+ * -- The column-key-buffer contains a sequence of serialized VInt values
+ * -- corresponding to the byte lengths of the serialized column fields
+ * -- in the corresponding rowgroup-column-buffer. For example, consider
+ * -- an integer column that contains the consecutive values 1, 2, 3, 44.
+ * -- The RCFile format stores these values as strings in the column buffer,
+ * -- e.g. "12344". The length of each column field is recorded in
+ * -- the column-key-buffer as a sequence of VInts: 1,1,1,2. However,
+ * -- if the same length occurs repeatedly, then we replace repeated
+ * -- run lengths with the complement (i.e. negative) of the number of
+ * -- repetitions, so 1,1,1,2 becomes 1,~2,2.
+ *
+ * column-key-buffer ::= Byte[column-key-buffer-length]
+ *
+ * rowgroup-column-buffers ::= <rowgroup-value-buffer>+
+ *
+ * -- RCFile stores all column data as strings regardless of the
+ * -- underlying column type. The strings are neither length-prefixed or
+ * -- null-terminated, and decoding them into individual fields requires
+ * -- the use of the run-length information contained in the corresponding
+ * -- column-key-buffer.
+ *
+ * rowgroup-column-buffer ::= Byte[column-buffer-length]
+ *
+ * Byte ::= An eight-bit byte
+ *
+ * VInt ::= Variable length integer. The high-order bit of each byte
+ * indicates whether more bytes remain to be read. The low-order seven
+ * bits are appended as increasingly more significant bits in the
+ * resulting integer value.
+ *
+ * Int ::= A four-byte integer in big-endian format.
+ *
+ * Text ::= VInt, Chars (Length prefixed UTF-8 characters)
+ * }
+ * </pre>
+ * </p>
*/
public class RCFile {
@@ -1385,21 +1578,31 @@ public class RCFile {
try {
seek(position + 4); // skip escape
- in.readFully(syncCheck);
- int syncLen = sync.length;
- for (int i = 0; in.getPos() < end; i++) {
- int j = 0;
- for (; j < syncLen; j++) {
- if (sync[j] != syncCheck[(i + j) % syncLen]) {
- break;
+
+ int prefix = sync.length;
+ int n = conf.getInt("io.bytes.per.checksum", 512);
+ byte[] buffer = new byte[prefix+n];
+ n = (int)Math.min(n, end - in.getPos());
+ /* fill array with a pattern that will never match sync */
+ Arrays.fill(buffer, (byte)(~sync[0]));
+ while(n > 0 && (in.getPos() + n) <= end) {
+ position = in.getPos();
+ in.readFully(buffer, prefix, n);
+ /* the buffer has n+sync bytes */
+ for(int i = 0; i < n; i++) {
+ int j;
+ for(j = 0; j < sync.length && sync[j] == buffer[i+j]; j++) {
+ /* nothing */
+ }
+ if(j == sync.length) {
+ /* simplified from (position + (i - prefix) + sync.length) - SYNC_SIZE */
+ in.seek(position + i - SYNC_SIZE);
+ return;
}
}
- if (j == syncLen) {
- in.seek(in.getPos() - SYNC_SIZE); // position before
- // sync
- return;
- }
- syncCheck[i % syncLen] = in.readByte();
+ /* move the last 16 bytes to the prefix area */
+ System.arraycopy(buffer, buffer.length - prefix - 1, buffer, 0, prefix);
+ n = (int)Math.min(n, end - in.getPos());
}
} catch (ChecksumException e) { // checksum failure
handleChecksumException(e);
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java Wed May 8 04:43:16 2013
@@ -44,7 +44,7 @@ public class ColumnAccessAnalyzer {
for (TableScanOperator op : topOps.keySet()) {
Table table = topOps.get(op);
String tableName = table.getCompleteName();
- List<FieldSchema> tableCols = table.getAllCols();
+ List<FieldSchema> tableCols = table.getCols();
for (int i : op.getNeededColumnIDs()) {
columnAccessInfo.add(tableName, tableCols.get(i).getName());
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g Wed May 8 04:43:16 2013
@@ -172,7 +172,7 @@ tableSample
tableSource
@init { gParent.msgs.push("table source"); }
@after { gParent.msgs.pop(); }
- : tabname=tableName (ts=tableSample)? (alias=identifier)?
+ : tabname=tableName (ts=tableSample)? (KW_AS? alias=identifier)?
-> ^(TOK_TABREF $tabname $ts? $alias?)
;
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Wed May 8 04:43:16 2013
@@ -1878,7 +1878,7 @@ destination
@init { msgs.push("destination specification"); }
@after { msgs.pop(); }
:
- KW_LOCAL KW_DIRECTORY StringLiteral -> ^(TOK_LOCAL_DIR StringLiteral)
+ KW_LOCAL KW_DIRECTORY StringLiteral tableRowFormat? tableFileFormat? -> ^(TOK_LOCAL_DIR StringLiteral tableRowFormat? tableFileFormat?)
| KW_DIRECTORY StringLiteral -> ^(TOK_DIR StringLiteral)
| KW_TABLE tableOrPartition -> tableOrPartition
;
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java Wed May 8 04:43:16 2013
@@ -51,6 +51,7 @@ public class QB {
private boolean isQuery;
private boolean isAnalyzeRewrite;
private CreateTableDesc tblDesc = null; // table descriptor of the final
+ private CreateTableDesc localDirectoryDesc = null ;
// used by PTFs
/*
@@ -227,6 +228,14 @@ public class QB {
tblDesc = desc;
}
+ public CreateTableDesc getLLocalDirectoryDesc() {
+ return localDirectoryDesc;
+ }
+
+ public void setLocalDirectoryDesc(CreateTableDesc localDirectoryDesc) {
+ this.localDirectoryDesc = localDirectoryDesc;
+ }
+
/**
* Whether this QB is for a CREATE-TABLE-AS-SELECT.
*/
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed May 8 04:43:16 2013
@@ -1192,6 +1192,10 @@ public class SemanticAnalyzer extends Ba
}
}
+ RowFormatParams rowFormatParams = new RowFormatParams();
+ AnalyzeCreateCommonVars shared = new AnalyzeCreateCommonVars();
+ StorageFormat storageFormat = new StorageFormat();
+
LOG.info("Get metadata for destination tables");
// Go over all the destination structures and populate the related
// metadata
@@ -1279,6 +1283,45 @@ public class SemanticAnalyzer extends Ba
}
qb.getMetaData().setDestForAlias(name, fname,
(ast.getToken().getType() == HiveParser.TOK_DIR));
+
+ CreateTableDesc localDirectoryDesc = new CreateTableDesc();
+ boolean localDirectoryDescIsSet = false;
+ int numCh = ast.getChildCount();
+ for (int num = 1; num < numCh ; num++){
+ ASTNode child = (ASTNode) ast.getChild(num);
+ if (ast.getChild(num) != null){
+ switch (child.getToken().getType()) {
+ case HiveParser.TOK_TABLEROWFORMAT:
+ rowFormatParams.analyzeRowFormat(shared, child);
+ localDirectoryDesc.setFieldDelim(rowFormatParams.fieldDelim);
+ localDirectoryDesc.setLineDelim(rowFormatParams.lineDelim);
+ localDirectoryDesc.setCollItemDelim(rowFormatParams.collItemDelim);
+ localDirectoryDesc.setMapKeyDelim(rowFormatParams.mapKeyDelim);
+ localDirectoryDesc.setFieldEscape(rowFormatParams.fieldEscape);
+ localDirectoryDescIsSet=true;
+ break;
+ case HiveParser.TOK_TABLESERIALIZER:
+ ASTNode serdeChild = (ASTNode) child.getChild(0);
+ shared.serde = unescapeSQLString(serdeChild.getChild(0).getText());
+ localDirectoryDesc.setSerName(shared.serde);
+ localDirectoryDescIsSet=true;
+ break;
+ case HiveParser.TOK_TBLSEQUENCEFILE:
+ case HiveParser.TOK_TBLTEXTFILE:
+ case HiveParser.TOK_TBLRCFILE:
+ case HiveParser.TOK_TBLORCFILE:
+ case HiveParser.TOK_TABLEFILEFORMAT:
+ storageFormat.fillStorageFormat(child, shared);
+ localDirectoryDesc.setOutputFormat(storageFormat.outputFormat);
+ localDirectoryDesc.setSerName(shared.serde);
+ localDirectoryDescIsSet=true;
+ break;
+ }
+ }
+ }
+ if (localDirectoryDescIsSet){
+ qb.setLocalDirectoryDesc(localDirectoryDesc);
+ }
break;
}
default:
@@ -5180,8 +5223,7 @@ public class SemanticAnalyzer extends Ba
String fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat);
} else {
- table_desc = PlanUtils.getDefaultTableDesc(Integer
- .toString(Utilities.ctrlaCode), cols, colTypes, false);
+ table_desc = PlanUtils.getDefaultTableDesc(qb.getLLocalDirectoryDesc(), cols, colTypes);
}
} else {
table_desc = PlanUtils.getTableDesc(tblDesc, cols, colTypes);
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Wed May 8 04:43:16 2013
@@ -100,6 +100,56 @@ public final class PlanUtils {
}
}
+ public static TableDesc getDefaultTableDesc(CreateTableDesc localDirectoryDesc,
+ String cols, String colTypes ) {
+ TableDesc tableDesc = getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode), cols,
+ colTypes, false);;
+ if (localDirectoryDesc == null) {
+ return tableDesc;
+ }
+
+ try {
+ if (localDirectoryDesc.getFieldDelim() != null) {
+ tableDesc.getProperties().setProperty(
+ serdeConstants.FIELD_DELIM, localDirectoryDesc.getFieldDelim());
+ tableDesc.getProperties().setProperty(
+ serdeConstants.SERIALIZATION_FORMAT, localDirectoryDesc.getFieldDelim());
+ }
+ if (localDirectoryDesc.getLineDelim() != null) {
+ tableDesc.getProperties().setProperty(
+ serdeConstants.LINE_DELIM, localDirectoryDesc.getLineDelim());
+ }
+ if (localDirectoryDesc.getCollItemDelim() != null) {
+ tableDesc.getProperties().setProperty(
+ serdeConstants.COLLECTION_DELIM, localDirectoryDesc.getCollItemDelim());
+ }
+ if (localDirectoryDesc.getMapKeyDelim() != null) {
+ tableDesc.getProperties().setProperty(
+ serdeConstants.MAPKEY_DELIM, localDirectoryDesc.getMapKeyDelim());
+ }
+ if (localDirectoryDesc.getFieldEscape() !=null) {
+ tableDesc.getProperties().setProperty(
+ serdeConstants.ESCAPE_CHAR, localDirectoryDesc.getFieldEscape());
+ }
+ if (localDirectoryDesc.getSerName() != null) {
+ tableDesc.setSerdeClassName(localDirectoryDesc.getSerName());
+ tableDesc.getProperties().setProperty(
+ serdeConstants.SERIALIZATION_LIB, localDirectoryDesc.getSerName());
+ tableDesc.setDeserializerClass(
+ (Class<? extends Deserializer>) Class.forName(localDirectoryDesc.getSerName()));
+ }
+ if (localDirectoryDesc.getOutputFormat() != null){
+ tableDesc.setOutputFileFormatClass(Class.forName(localDirectoryDesc.getOutputFormat()));
+ }
+ } catch (ClassNotFoundException e) {
+ // mimicking behaviour in CreateTableDesc tableDesc creation
+ // returning null table description for output.
+ e.printStackTrace();
+ return null;
+ }
+ return tableDesc;
+ }
+
/**
* Generate the table descriptor of MetadataTypedColumnsetSerDe with the
* separatorCode and column names (comma separated string).
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java Wed May 8 04:43:16 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.UD
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -303,6 +304,14 @@ public class GenericUDAFAverage extends
}
}
+ @AggregationType(estimable = true)
+ static class AverageAgg extends AbstractAggregationBuffer {
+ long count;
+ double sum;
+ @Override
+ public int estimate() { return JavaDataModel.PRIMITIVES2 * 2; }
+ };
+
@Override
public void reset(AggregationBuffer aggregation) throws HiveException {
doReset((AverageAggregationBuffer<TYPE>)aggregation);
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java Wed May 8 04:43:16 2013
@@ -147,7 +147,7 @@ public class GenericUDAFBridge extends A
}
/** class for storing UDAFEvaluator value. */
- static class UDAFAgg implements AggregationBuffer {
+ static class UDAFAgg extends AbstractAggregationBuffer {
UDAFEvaluator ueObject;
UDAFAgg(UDAFEvaluator ueObject) {
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java Wed May 8 04:43:16 2013
@@ -99,7 +99,7 @@ public class GenericUDAFCollectSet exten
}
}
- static class MkArrayAggregationBuffer implements AggregationBuffer {
+ static class MkArrayAggregationBuffer extends AbstractAggregationBuffer {
Set<Object> container;
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java Wed May 8 04:43:16 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.De
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -185,11 +186,17 @@ public class GenericUDAFComputeStats ext
foi);
}
- public static class BooleanStatsAgg implements AggregationBuffer {
+ @AggregationType(estimable = true)
+ public static class BooleanStatsAgg extends AbstractAggregationBuffer {
public String columnType; /* Datatype of column */
public long countTrues; /* Count of number of true values seen so far */
public long countFalses; /* Count of number of false values seen so far */
public long countNulls; /* Count of number of null values seen so far */
+ @Override
+ public int estimate() {
+ JavaDataModel model = JavaDataModel.get();
+ return model.primitive2() * 3 + model.lengthFor(columnType);
+ }
};
@Override
@@ -426,7 +433,9 @@ public class GenericUDAFComputeStats ext
}
}
- public static class LongStatsAgg implements AggregationBuffer {
+
+ @AggregationType(estimable = true)
+ public static class LongStatsAgg extends AbstractAggregationBuffer {
public String columnType;
public long min; /* Minimum value seen so far */
public long max; /* Maximum value seen so far */
@@ -434,6 +443,12 @@ public class GenericUDAFComputeStats ext
public LongNumDistinctValueEstimator numDV; /* Distinct value estimator */
public boolean firstItem; /* First item in the aggBuf? */
public int numBitVectors;
+ @Override
+ public int estimate() {
+ JavaDataModel model = JavaDataModel.get();
+ return model.primitive1() * 2 + model.primitive2() * 3 +
+ model.lengthFor(columnType) + model.lengthFor(numDV);
+ }
};
@Override
@@ -738,7 +753,8 @@ public class GenericUDAFComputeStats ext
}
}
- public static class DoubleStatsAgg implements AggregationBuffer {
+ @AggregationType(estimable = true)
+ public static class DoubleStatsAgg extends AbstractAggregationBuffer {
public String columnType;
public double min; /* Minimum value seen so far */
public double max; /* Maximum value seen so far */
@@ -746,6 +762,12 @@ public class GenericUDAFComputeStats ext
public DoubleNumDistinctValueEstimator numDV; /* Distinct value estimator */
public boolean firstItem; /* First item in the aggBuf? */
public int numBitVectors;
+ @Override
+ public int estimate() {
+ JavaDataModel model = JavaDataModel.get();
+ return model.primitive1() * 2 + model.primitive2() * 3 +
+ model.lengthFor(columnType) + model.lengthFor(numDV);
+ }
};
@Override
@@ -1061,7 +1083,8 @@ public class GenericUDAFComputeStats ext
}
}
- public static class StringStatsAgg implements AggregationBuffer {
+ @AggregationType(estimable = true)
+ public static class StringStatsAgg extends AbstractAggregationBuffer {
public String columnType;
public long maxLength; /* Maximum length seen so far */
public long sumLength; /* Sum of lengths of all values seen so far */
@@ -1070,6 +1093,12 @@ public class GenericUDAFComputeStats ext
public StringNumDistinctValueEstimator numDV; /* Distinct value estimator */
public int numBitVectors;
public boolean firstItem;
+ @Override
+ public int estimate() {
+ JavaDataModel model = JavaDataModel.get();
+ return model.primitive1() * 2 + model.primitive2() * 4 +
+ model.lengthFor(columnType) + model.lengthFor(numDV);
+ }
};
@Override
@@ -1377,12 +1406,18 @@ public class GenericUDAFComputeStats ext
}
}
- public static class BinaryStatsAgg implements AggregationBuffer {
+ @AggregationType(estimable = true)
+ public static class BinaryStatsAgg extends AbstractAggregationBuffer {
public String columnType;
public long maxLength; /* Maximum length seen so far */
public long sumLength; /* Sum of lengths of all values seen so far */
public long count; /* Count of all values seen so far */
public long countNulls; /* Count of number of null values seen so far */
+ @Override
+ public int estimate() {
+ JavaDataModel model = JavaDataModel.get();
+ return model.primitive2() * 4 + model.lengthFor(columnType);
+ }
};
@Override
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFContextNGrams.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFContextNGrams.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFContextNGrams.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFContextNGrams.java Wed May 8 04:43:16 2013
@@ -400,7 +400,7 @@ public class GenericUDAFContextNGrams im
// Aggregation buffer methods.
- static class NGramAggBuf implements AggregationBuffer {
+ static class NGramAggBuf extends AbstractAggregationBuffer {
ArrayList<String> context;
NGramEstimator nge;
};
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java Wed May 8 04:43:16 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.De
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -247,13 +248,16 @@ public class GenericUDAFCorrelation exte
}
}
- static class StdAgg implements AggregationBuffer {
+ @AggregationType(estimable = true)
+ static class StdAgg extends AbstractAggregationBuffer {
long count; // number n of elements
double xavg; // average of x elements
double yavg; // average of y elements
double xvar; // n times the variance of x elements
double yvar; // n times the variance of y elements
double covar; // n times the covariance
+ @Override
+ public int estimate() { return JavaDataModel.PRIMITIVES2 * 6; }
};
@Override