You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2013/05/08 06:43:18 UTC

svn commit: r1480158 [1/2] - in /hive/branches/HIVE-4115: ./ common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ data/files/ hcatalog/ hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/ hcatalog/src/test/e2e/templeton/drivers...

Author: amareshwari
Date: Wed May  8 04:43:16 2013
New Revision: 1480158

URL: http://svn.apache.org/r1480158
Log:
Merging r1476037 through r1480157 into HIVE-4115

Added:
    hive/branches/HIVE-4115/data/files/array_table.txt
      - copied unchanged from r1480157, hive/trunk/data/files/array_table.txt
    hive/branches/HIVE-4115/data/files/map_table.txt
      - copied unchanged from r1480157, hive/trunk/data/files/map_table.txt
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
      - copied unchanged from r1480157, hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
    hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/insert_overwrite_local_directory_1.q
      - copied unchanged from r1480157, hive/trunk/ql/src/test/queries/clientpositive/insert_overwrite_local_directory_1.q
    hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/ptf_register_tblfn.q
      - copied unchanged from r1480157, hive/trunk/ql/src/test/queries/clientpositive/ptf_register_tblfn.q
    hive/branches/HIVE-4115/ql/src/test/results/clientpositive/insert_overwrite_local_directory_1.q.out
      - copied unchanged from r1480157, hive/trunk/ql/src/test/results/clientpositive/insert_overwrite_local_directory_1.q.out
    hive/branches/HIVE-4115/ql/src/test/results/clientpositive/ptf_register_tblfn.q.out
      - copied unchanged from r1480157, hive/trunk/ql/src/test/results/clientpositive/ptf_register_tblfn.q.out
Removed:
    hive/branches/HIVE-4115/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java.orig
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFFunctionInfo.java
Modified:
    hive/branches/HIVE-4115/   (props changed)
    hive/branches/HIVE-4115/build.properties
    hive/branches/HIVE-4115/build.xml
    hive/branches/HIVE-4115/common/build.xml
    hive/branches/HIVE-4115/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/HIVE-4115/conf/hive-default.xml.template
    hive/branches/HIVE-4115/hcatalog/build.xml
    hive/branches/HIVE-4115/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
    hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
    hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
    hive/branches/HIVE-4115/jdbc/src/java/org/apache/hive/jdbc/HiveDatabaseMetaData.java
    hive/branches/HIVE-4115/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFContextNGrams.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCovariance.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEWAHBitmap.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFEvaluator.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFHistogramNumeric.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFVariance.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java
    hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java
    hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/column_access_stats.q
    hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/input4.q
    hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/join39.q
    hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/join40.q
    hive/branches/HIVE-4115/ql/src/test/queries/clientpositive/mapjoin1.q
    hive/branches/HIVE-4115/ql/src/test/results/beelinepositive/join39.q.out
    hive/branches/HIVE-4115/ql/src/test/results/beelinepositive/join40.q.out
    hive/branches/HIVE-4115/ql/src/test/results/beelinepositive/mapjoin1.q.out
    hive/branches/HIVE-4115/ql/src/test/results/clientpositive/column_access_stats.q.out
    hive/branches/HIVE-4115/ql/src/test/results/clientpositive/input4.q.out
    hive/branches/HIVE-4115/ql/src/test/results/clientpositive/show_functions.q.out
    hive/branches/HIVE-4115/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java

Propchange: hive/branches/HIVE-4115/
------------------------------------------------------------------------------
  Merged /hive/trunk:r1476037-1480157

Modified: hive/branches/HIVE-4115/build.properties
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/build.properties?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/build.properties (original)
+++ hive/branches/HIVE-4115/build.properties Wed May  8 04:43:16 2013
@@ -75,7 +75,7 @@ common.jar=${hadoop.root}/lib/commons-ht
 # module names needed for build process
 iterate.hive.all=ant,shims,common,serde,metastore,ql,contrib,service,cli,jdbc,beeline,hwi,hbase-handler,testutils,hcatalog
 iterate.hive.modules=shims,common,serde,metastore,ql,contrib,service,cli,jdbc,beeline,hwi,hbase-handler,testutils,hcatalog
-iterate.hive.tests=ql,contrib,hbase-handler,hwi,jdbc,metastore,odbc,serde,service,hcatalog
+iterate.hive.tests=ql,contrib,hbase-handler,hwi,jdbc,beeline,metastore,odbc,serde,service,hcatalog
 iterate.hive.thrift=ql,service,metastore,serde
 iterate.hive.protobuf=ql
 iterate.hive.cpp=odbc

Modified: hive/branches/HIVE-4115/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/build.xml?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/build.xml (original)
+++ hive/branches/HIVE-4115/build.xml Wed May  8 04:43:16 2013
@@ -506,8 +506,8 @@
     <!-- Package the hcat stuff and pull it up into Hive's build dir -->
     <ant antfile="${hive.root}/hcatalog/build.xml" target="package"
         inheritAll="false"/>
-    <mkdir dir="${build.dir.hive}/hcatalog"/>
-    <copy todir="${build.dir.hive}/hcatalog">
+    <mkdir dir="${target.dir}/hcatalog"/>
+    <copy todir="${target.dir}/hcatalog">
         <fileset dir="${hive.root}/hcatalog/build/hcatalog-${hcatalog.version}"/>
     </copy>
   </target>
@@ -711,6 +711,7 @@
         <tarfileset dir="${hive.root}" mode="664" prefix="${tar.final.name}/src"
                     excludes="${vcs.excludes}">
           <exclude name="build/**" />
+          <exclude name="hcatalog/**/build/**" />
           <exclude name="bin/**" />
           <exclude name="**/py/**/*-remote" />
           <exclude name="data/scripts/**" />
@@ -720,6 +721,7 @@
         <tarfileset dir="${hive.root}" mode="755" prefix="${tar.final.name}/src"
                     excludes="${vcs.excludes}">
           <exclude name="build/**" />
+          <exclude name="hcatalog/**/build/**" />
           <include name="bin/**" />
           <include name="**/py/**/*-remote" />
           <include name="data/scripts/**" />
@@ -750,9 +752,6 @@
           <exclude name="docs/**"/>
           <exclude name="lib/py/**/*-remote"/>
         </tarfileset>
-        <tarfileset dir="${build.dir.hive}/hcatalog" mode="755" prefix="${bin.final.name}"
-                    excludes="${vcs.excludes}">
-        </tarfileset>
       </param.listofitems>
     </macro_tar>
   </target>

Modified: hive/branches/HIVE-4115/common/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/common/build.xml?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/common/build.xml (original)
+++ hive/branches/HIVE-4115/common/build.xml Wed May  8 04:43:16 2013
@@ -25,6 +25,7 @@ to call at top-level: ant deploy-contrib
 <project name="common" default="jar">
 
   <property name="src.dir"  location="${basedir}/src/java"/>
+  <property name="src.gen.dir"  location="${basedir}/src/gen"/>
   <import file="../build-common.xml"/>
 
   <target name="compile" depends="init, setup, ivy-retrieve">
@@ -36,7 +37,7 @@ to call at top-level: ant deploy-contrib
     </exec>
     <javac
      encoding="${build.encoding}"
-     srcdir="${src.dir}"
+     srcdir="${src.dir}:${src.gen.dir}"
      includes="**/*.java"
      destdir="${build.classes}"
      debug="${javac.debug}"

Modified: hive/branches/HIVE-4115/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/HIVE-4115/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed May  8 04:43:16 2013
@@ -415,8 +415,6 @@ public class HiveConf extends Configurat
     HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000),
     HIVEJOINCACHESIZE("hive.join.cache.size", 25000),
     HIVEMAPJOINBUCKETCACHESIZE("hive.mapjoin.bucket.cache.size", 100),
-    HIVEMAPJOINROWSIZE("hive.mapjoin.size.key", 10000),
-    HIVEMAPJOINCACHEROWS("hive.mapjoin.cache.numrows", 25000),
     HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000),
     HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float) 0.5),
     HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY("hive.mapjoin.followby.map.aggr.hash.percentmemory", (float) 0.3),

Modified: hive/branches/HIVE-4115/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/conf/hive-default.xml.template?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/conf/hive-default.xml.template (original)
+++ hive/branches/HIVE-4115/conf/hive-default.xml.template Wed May  8 04:43:16 2013
@@ -595,12 +595,6 @@
 </property>
 
 <property>
-  <name>hive.mapjoin.cache.numrows</name>
-  <value>25000</value>
-  <description>How many rows should be cached by jdbm for map join. </description>
-</property>
-
-<property>
   <name>hive.optimize.skewjoin</name>
   <value>false</value>
   <description>Whether to enable skew join optimization.

Modified: hive/branches/HIVE-4115/hcatalog/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/hcatalog/build.xml?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/hcatalog/build.xml (original)
+++ hive/branches/HIVE-4115/hcatalog/build.xml Wed May  8 04:43:16 2013
@@ -91,18 +91,23 @@
     <target name="gen-test" description="Generate tests, a no-op for hcat"/>
 
     <target name="test" depends="jar" description="run unit tests">
-        <ant target="test" dir="core" inheritAll="false"/>
-        <ant target="test" dir="hcatalog-pig-adapter" inheritAll="false"/>
-        <ant target="test" dir="server-extensions" inheritAll="false"/>
-        <ant target="test" dir="webhcat/svr" inheritAll="false"/>
-        <ant target="test" dir="webhcat/java-client" inheritAll="false"/>
-        <ant target="test" dir="storage-handlers/hbase" inheritAll="false"/>
-        <!-- One checkstyle run for the whole repo. Runs after junit tests
-        to piggyback on resolved jars. -->
-        <path id="checkstyle.class.path">
-          <fileset dir="core/build/lib/test"/>
-        </path>
-        <antcall target="checkstyle" inheritRefs="true"/>
+        <!-- Placed in a parallel structure so that the tests keep going
+             even if some fail.  Otherwise a failure in one of the earlier ant
+             call terminates the target and the rest do not run.  -->
+        <parallel threadCount="1">
+            <ant target="test" dir="core" inheritAll="false"/>
+            <ant target="test" dir="hcatalog-pig-adapter" inheritAll="false"/>
+            <ant target="test" dir="server-extensions" inheritAll="false"/>
+            <ant target="test" dir="webhcat/svr" inheritAll="false"/>
+            <ant target="test" dir="webhcat/java-client" inheritAll="false"/>
+            <ant target="test" dir="storage-handlers/hbase" inheritAll="false"/>
+            <!-- One checkstyle run for the whole repo. Runs after junit tests
+            to piggyback on resolved jars. -->
+            <path id="checkstyle.class.path">
+                <fileset dir="core/build/lib/test"/>
+            </path>
+            <antcall target="checkstyle" inheritRefs="true"/>
+        </parallel>
     </target>
 
     <target name="compile-test" depends="jar" description="compile unit tests">

Modified: hive/branches/HIVE-4115/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ hive/branches/HIVE-4115/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java Wed May  8 04:43:16 2013
@@ -24,9 +24,9 @@ import java.util.HashMap;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
@@ -76,17 +76,42 @@ import org.slf4j.LoggerFactory;
 public class NotificationListener extends MetaStoreEventListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
-    protected Session session;
     protected Connection conn;
     private static MessageFactory messageFactory = MessageFactory.getInstance();
+    public static final int NUM_RETRIES = 1;
+    private static final String HEALTH_CHECK_TOPIC_SUFFIX = "jms_health_check";
+    private static final String HEALTH_CHECK_MSG = "HCAT_JMS_HEALTH_CHECK_MESSAGE";
+
+    protected final ThreadLocal<Session> session = new ThreadLocal<Session>() {
+        @Override
+        protected Session initialValue() {
+            try {
+                return createSession();
+            } catch (Exception e) {
+                LOG.error("Couldn't create JMS Session", e);
+                return null;
+            }
+        }
+
+        @Override
+        public void remove() {
+            if (get() != null) {
+                try {
+                    get().close();
+                } catch (Exception e) {
+                    LOG.error("Unable to close bad JMS session, ignored error", e);
+                }
+            }
+            super.remove();
+        }
+    };
 
     /**
      * Create message bus connection and session in constructor.
      */
     public NotificationListener(final Configuration conf) {
-
         super(conf);
-        createConnection();
+        testAndCreateConnection();
     }
 
     private static String getTopicName(Partition partition,
@@ -178,7 +203,7 @@ public class NotificationListener extend
         // Subscriber can get notification about drop of a database in HCAT
         // by listening on a topic named "HCAT" and message selector string
         // as "HCAT_EVENT = HCAT_DROP_DATABASE"
-        if (dbEvent.getStatus())  {
+        if (dbEvent.getStatus()) {
             String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
             send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName);
         }
@@ -216,7 +241,7 @@ public class NotificationListener extend
         }
     }
 
-    private String getTopicPrefix(HiveConf conf) {
+    private String getTopicPrefix(Configuration conf) {
         return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
             HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
     }
@@ -253,86 +278,102 @@ public class NotificationListener extend
      * @param topicName is the name on message broker on which message is sent.
      */
     protected void send(HCatEventMessage hCatEventMessage, String topicName) {
-        try {
-            if(null == session){
-                // this will happen, if we never able to establish a connection.
-                createConnection();
-                if (null == session){
-                    // Still not successful, return from here.
-                    LOG.error("Invalid session. Failed to send message on topic: " +
-                            topicName + " event: " + hCatEventMessage.getEventType());
-                    return;
-                }
-            }
-
-            Destination topic = getTopic(topicName);
+        send(hCatEventMessage, topicName, NUM_RETRIES);
+    }
 
-            if (null == topic){
-                // Still not successful, return from here.
-                LOG.error("Invalid session. Failed to send message on topic: " +
-                        topicName + " event: " + hCatEventMessage.getEventType());
-                return;
+    /**
+     * @param hCatEventMessage The HCatEventMessage being sent over JMS, this method is threadsafe
+     * @param topicName is the name on message broker on which message is sent.
+     * @param retries the number of retry attempts
+     */
+    protected void send(HCatEventMessage hCatEventMessage, String topicName, int retries) {
+        try {
+            if (session.get() == null) {
+                // Need to reconnect
+                throw new JMSException("Invalid JMS session");
             }
-
-            MessageProducer producer = session.createProducer(topic);
-            Message msg = session.createTextMessage(hCatEventMessage.toString());
+            Destination topic = createTopic(topicName);
+            Message msg = session.get().createTextMessage(hCatEventMessage.toString());
 
             msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString());
             msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion());
             msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat());
+            MessageProducer producer = createProducer(topic);
             producer.send(msg);
             // Message must be transacted before we return.
-            session.commit();
-        }
-        catch(Exception e){
-            // Gobble up the exception. Message delivery is best effort.
-            LOG.error("Failed to send message on topic: " + topicName +
-                    " event: " + hCatEventMessage.getEventType(), e);
+            session.get().commit();
+        } catch (Exception e) {
+            if (retries >= 0) {
+                // this may happen if we were able to establish connection once, but its no longer valid
+                LOG.error("Seems like connection is lost. Will retry. Retries left : " + retries + ". error was:", e);
+                testAndCreateConnection();
+                send(hCatEventMessage, topicName, retries - 1);
+            } else {
+                // Gobble up the exception. Message delivery is best effort.
+                LOG.error("Failed to send message on topic: " + topicName +
+                    " event: " + hCatEventMessage.getEventType() + " after retries: " + NUM_RETRIES, e);
+            }
         }
     }
 
     /**
-     * Get the topic object for the topicName, it also tries to reconnect
-     * if the connection appears to be broken.
+     * Get the topic object for the topicName
      *
      * @param topicName The String identifying the message-topic.
      * @return A {@link Topic} object corresponding to the specified topicName.
      * @throws JMSException
      */
-    protected Topic getTopic(final String topicName) throws JMSException {
-        Topic topic;
+    protected Topic createTopic(final String topicName) throws JMSException {
+        return session.get().createTopic(topicName);
+    }
+
+    /**
+     * Does a health check on the connection by sending a dummy message.
+     * Create the connection if the connection is found to be bad
+     * Also recreates the session
+     */
+    protected synchronized void testAndCreateConnection() {
+        if (conn != null) {
+            // This method is reached when error occurs while sending msg, so the session must be bad
+            session.remove();
+            if (!isConnectionHealthy()) {
+                // I am the first thread to detect the error, cleanup old connection & reconnect
+                try {
+                    conn.close();
+                } catch (Exception e) {
+                    LOG.error("Unable to close bad JMS connection, ignored error", e);
+                }
+                conn = createConnection();
+            }
+        } else {
+            conn = createConnection();
+        }
         try {
-            // Topics are created on demand. If it doesn't exist on broker it will
-            // be created when broker receives this message.
-            topic = session.createTopic(topicName);
-        } catch (IllegalStateException ise) {
-            // this will happen if we were able to establish connection once, but its no longer valid,
-            // ise is thrown, catch it and retry.
-            LOG.error("Seems like connection is lost. Retrying", ise);
-            createConnection();
-            topic = session.createTopic(topicName);
+            session.set(createSession());
+        } catch (JMSException e) {
+            LOG.error("Couldn't create JMS session, ignored the error", e);
         }
-        return topic;
     }
 
-    protected void createConnection() {
-
+    /**
+     * Create the JMS connection
+     * @return newly created JMS connection
+     */
+    protected Connection createConnection() {
+        LOG.info("Will create new JMS connection");
         Context jndiCntxt;
+        Connection jmsConnection = null;
         try {
             jndiCntxt = new InitialContext();
-            ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
-                .lookup("ConnectionFactory");
-            Connection conn = connFac.createConnection();
-            conn.start();
-            conn.setExceptionListener(new ExceptionListener() {
+            ConnectionFactory connFac = (ConnectionFactory) jndiCntxt.lookup("ConnectionFactory");
+            jmsConnection = connFac.createConnection();
+            jmsConnection.start();
+            jmsConnection.setExceptionListener(new ExceptionListener() {
                 @Override
                 public void onException(JMSException jmse) {
-                    LOG.error(jmse.toString());
+                    LOG.error("JMS Exception listener received exception. Ignored the error", jmse);
                 }
             });
-            // We want message to be sent when session commits, thus we run in
-            // transacted mode.
-            session = conn.createSession(true, Session.SESSION_TRANSACTED);
         } catch (NamingException e) {
             LOG.error("JNDI error while setting up Message Bus connection. "
                 + "Please make sure file named 'jndi.properties' is in "
@@ -342,20 +383,54 @@ public class NotificationListener extend
         } catch (Throwable t) {
             LOG.error("Unable to connect to JMS provider", t);
         }
+        return jmsConnection;
+    }
+
+    /**
+     * Send a dummy message to probe if the JMS connection is healthy
+     * @return true if connection is healthy, false otherwise
+     */
+    protected boolean isConnectionHealthy() {
+        try {
+            Topic topic = createTopic(getTopicPrefix(getConf()) + "." + HEALTH_CHECK_TOPIC_SUFFIX);
+            MessageProducer producer = createProducer(topic);
+            Message msg = session.get().createTextMessage(HEALTH_CHECK_MSG);
+            producer.send(msg, DeliveryMode.NON_PERSISTENT, 4, 0);
+        } catch (Exception e) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Creates a JMS session
+     * @return newly create JMS session
+     * @throws JMSException
+     */
+    protected Session createSession() throws JMSException {
+        // We want message to be sent when session commits, thus we run in
+        // transacted mode.
+        return conn.createSession(true, Session.SESSION_TRANSACTED);
+    }
+
+    /**
+     * Create a JMS producer
+     * @param topic
+     * @return newly created message producer
+     * @throws JMSException
+     */
+    protected MessageProducer createProducer(Destination topic) throws JMSException {
+        return session.get().createProducer(topic);
     }
 
     @Override
     protected void finalize() throws Throwable {
-        // Close the connection before dying.
-        try {
-            if (null != session)
-                session.close();
-            if (conn != null) {
+        if (conn != null) {
+            try {
                 conn.close();
+            } catch (Exception e) {
+                LOG.error("Couldn't close jms connection, ignored the error", e);
             }
-
-        } catch (Exception ignore) {
-            LOG.info("Failed to close message bus connection.", ignore);
         }
     }
 

Modified: hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm (original)
+++ hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm Wed May  8 04:43:16 2013
@@ -699,7 +699,9 @@ sub compare
     }
 
     
-    if (defined $testCmd->{'check_job_created'} || defined  $testCmd->{'check_job_complete'}) {
+    if ( (defined $testCmd->{'check_job_created'})
+         || (defined $testCmd->{'check_job_complete'})
+         || (defined $testCmd->{'check_job_exit_value'}) ) {
       my $jobid = $json_hash->{'id'};
       if (!defined $jobid) {
         print $log "$0::$subName INFO check failed: " 
@@ -714,7 +716,7 @@ sub compare
             . "jobresult not defined ";
           $result = 0;
         }
-        if ($testCmd->{'check_job_complete'}) {
+        if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'})) {
           my $jobComplete;
           my $NUM_RETRIES = 60;
           my $SLEEP_BETWEEN_RETRIES = 5;
@@ -736,11 +738,21 @@ sub compare
             $result = 0;
           } else { 
             # job has completed, check the runState value
-            my $runState = $res_hash->{'status'}->{'runState'};
-            my $runStateVal = $self->getRunStateNum($testCmd->{'check_job_complete'});
-            if ( (!defined $runState) || $runState ne $runStateVal) {
-              print $log "check_job_complete failed. got runState  $runState,  expected  $runStateVal";
-              $result = 0;
+            if (defined($testCmd->{'check_job_complete'})) {
+              my $runState = $res_hash->{'status'}->{'runState'};
+              my $runStateVal = $self->getRunStateNum($testCmd->{'check_job_complete'});
+              if ( (!defined $runState) || $runState ne $runStateVal) {
+                print $log "check_job_complete failed. got runState  $runState,  expected  $runStateVal";
+                $result = 0;
+              }
+            }
+            if (defined($testCmd->{'check_job_exit_value'})) {
+              my $exitValue = $res_hash->{'exitValue'};
+              my $expectedExitValue = $testCmd->{'check_job_exit_value'};
+              if ( (!defined $exitValue) || $exitValue ne $expectedExitValue) {
+                print $log "check_job_exit_value failed. got exitValue $exitValue,  expected  $expectedExitValue";
+                $result = 0;
+              }
             }
           }
         }

Modified: hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf (original)
+++ hive/branches/HIVE-4115/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf Wed May  8 04:43:16 2013
@@ -49,6 +49,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS',
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
     {
@@ -107,6 +108,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
    ]
@@ -128,6 +130,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'FAILURE', 
+     'check_job_exit_value' => 8,
      'check_call_back' => 1,
     },
     {
@@ -140,7 +143,9 @@ $cfg = 
                                 #results
      'status_code' => 200,
      'check_job_created' => 1,
+     'check_job_exit_value' => 0,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
     {
@@ -154,6 +159,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
     {
@@ -167,6 +173,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -181,6 +188,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -197,6 +205,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -212,6 +221,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -257,6 +267,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'FAILURE', 
+     'check_job_exit_value' => 11,
 
     },
  
@@ -271,6 +282,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -285,6 +297,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
     {
@@ -298,6 +311,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS',
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -312,6 +326,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -326,6 +341,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -340,6 +356,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
 
     },
@@ -356,6 +373,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
 
     },
 

Modified: hive/branches/HIVE-4115/jdbc/src/java/org/apache/hive/jdbc/HiveDatabaseMetaData.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/jdbc/src/java/org/apache/hive/jdbc/HiveDatabaseMetaData.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/jdbc/src/java/org/apache/hive/jdbc/HiveDatabaseMetaData.java (original)
+++ hive/branches/HIVE-4115/jdbc/src/java/org/apache/hive/jdbc/HiveDatabaseMetaData.java Wed May  8 04:43:16 2013
@@ -28,12 +28,16 @@ import java.util.Comparator;
 import java.util.jar.Attributes;
 
 import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.thrift.TCLIService;
 import org.apache.hive.service.cli.thrift.TGetCatalogsReq;
 import org.apache.hive.service.cli.thrift.TGetCatalogsResp;
 import org.apache.hive.service.cli.thrift.TGetColumnsReq;
 import org.apache.hive.service.cli.thrift.TGetColumnsResp;
 import org.apache.hive.service.cli.thrift.TGetFunctionsReq;
 import org.apache.hive.service.cli.thrift.TGetFunctionsResp;
+import org.apache.hive.service.cli.thrift.TGetInfoReq;
+import org.apache.hive.service.cli.thrift.TGetInfoResp;
 import org.apache.hive.service.cli.thrift.TGetSchemasReq;
 import org.apache.hive.service.cli.thrift.TGetSchemasResp;
 import org.apache.hive.service.cli.thrift.TGetTableTypesReq;
@@ -42,7 +46,6 @@ import org.apache.hive.service.cli.thrif
 import org.apache.hive.service.cli.thrift.TGetTablesResp;
 import org.apache.hive.service.cli.thrift.TGetTypeInfoReq;
 import org.apache.hive.service.cli.thrift.TGetTypeInfoResp;
-import org.apache.hive.service.cli.thrift.TCLIService;
 import org.apache.hive.service.cli.thrift.TSessionHandle;
 import org.apache.thrift.TException;
 
@@ -249,8 +252,17 @@ public class HiveDatabaseMetaData implem
   }
 
   public String getDatabaseProductVersion() throws SQLException {
-    // TODO: Fetch this from the server side
-    return "0.10.0";
+
+    TGetInfoReq req = new TGetInfoReq(sessHandle, GetInfoType.CLI_DBMS_VER.toTGetInfoType());
+    TGetInfoResp resp;
+    try {
+      resp = client.GetInfo(req);
+    } catch (TException e) {
+      throw new SQLException(e.getMessage(), "08S01", e);
+    }
+    Utils.verifySuccess(resp.getStatus());
+
+    return resp.getInfoValue().getStringValue();
   }
 
   public int getDefaultTransactionIsolation() throws SQLException {

Modified: hive/branches/HIVE-4115/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java (original)
+++ hive/branches/HIVE-4115/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java Wed May  8 04:43:16 2013
@@ -35,11 +35,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.common.util.HiveVersionInfo;
 
 /**
  * TestJdbcDriver2
@@ -833,7 +835,11 @@ public class TestJdbcDriver2 extends Tes
     DatabaseMetaData meta = con.getMetaData();
 
     assertEquals("Hive", meta.getDatabaseProductName());
-    assertEquals("0.10.0", meta.getDatabaseProductVersion());
+    assertEquals(HiveVersionInfo.getVersion(), meta.getDatabaseProductVersion());
+    assertEquals(System.getProperty("hive.version"), meta.getDatabaseProductVersion());
+    assertTrue("verifying hive version pattern. got " + meta.getDatabaseProductVersion(),
+        Pattern.matches("\\d+\\.\\d+\\.\\d+.*", meta.getDatabaseProductVersion()) );
+
     assertEquals(DatabaseMetaData.sqlStateSQL99, meta.getSQLStateType());
     assertFalse(meta.supportsCatalogsInTableDefinitions());
     assertFalse(meta.supportsSchemasInTableDefinitions());

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed May  8 04:43:16 2013
@@ -1060,6 +1060,10 @@ public class Driver implements CommandPr
 
     conf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
     conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr);
+
+    conf.set("mapreduce.workflow.id", "hive_"+queryId);
+    conf.set("mapreduce.workflow.name", queryStr);
+
     maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
 
     try {
@@ -1338,6 +1342,8 @@ public class Driver implements CommandPr
       if (noName) {
         conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + tsk.getId() + ")");
       }
+      conf.set("mapreduce.workflow.node.name", tsk.getId());
+      Utilities.setWorkflowAdjacencies(conf, plan);
       cxt.incCurJobNo(1);
       console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
     }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Wed May  8 04:43:16 2013
@@ -53,7 +53,6 @@ public abstract class AbstractMapJoinOpe
   protected transient List<ObjectInspector>[] joinKeysStandardObjectInspectors;
 
   protected transient byte posBigTable = -1; // one of the tables that is not in memory
-  transient int mapJoinRowsKey; // rows for a given key
 
   protected transient RowContainer<ArrayList<Object>> emptyList = null;
 
@@ -104,9 +103,6 @@ public abstract class AbstractMapJoinOpe
         !hasFilter(posBigTable), reporter);
     storage[posBigTable] = bigPosRC;
 
-    mapJoinRowsKey = HiveConf.getIntVar(hconf,
-        HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
-
     List<? extends StructField> structFields = ((StructObjectInspector) outputObjInspector)
         .getAllStructFieldRefs();
     if (conf.getOutputColumnNames().size() < structFields.size()) {

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java Wed May  8 04:43:16 2013
@@ -23,6 +23,8 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver;
+import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction;
 
 /**
  * FunctionInfo.
@@ -32,6 +34,8 @@ public class FunctionInfo {
 
   private final boolean isNative;
 
+  private final boolean isInternalTableFunction;
+
   private final String displayName;
 
   private GenericUDF genericUDF;
@@ -40,11 +44,14 @@ public class FunctionInfo {
 
   private GenericUDAFResolver genericUDAFResolver;
 
+  private Class<? extends TableFunctionResolver>  tableFunctionResolver;
+
   public FunctionInfo(boolean isNative, String displayName,
       GenericUDF genericUDF) {
     this.isNative = isNative;
     this.displayName = displayName;
     this.genericUDF = genericUDF;
+    this.isInternalTableFunction = false;
   }
 
   public FunctionInfo(boolean isNative, String displayName,
@@ -52,6 +59,7 @@ public class FunctionInfo {
     this.isNative = isNative;
     this.displayName = displayName;
     this.genericUDAFResolver = genericUDAFResolver;
+    this.isInternalTableFunction = false;
   }
 
   public FunctionInfo(boolean isNative, String displayName,
@@ -59,6 +67,16 @@ public class FunctionInfo {
     this.isNative = isNative;
     this.displayName = displayName;
     this.genericUDTF = genericUDTF;
+    this.isInternalTableFunction = false;
+  }
+
+  public FunctionInfo(String displayName, Class<? extends TableFunctionResolver> tFnCls)
+  {
+    this.displayName = displayName;
+    this.tableFunctionResolver = tFnCls;
+    PartitionTableFunctionDescription def = tableFunctionResolver.getAnnotation(PartitionTableFunctionDescription.class);
+    this.isNative = (def == null) ? false : def.isInternal();
+    this.isInternalTableFunction = isNative;
   }
 
   /**
@@ -90,6 +108,8 @@ public class FunctionInfo {
     return genericUDAFResolver;
   }
 
+
+
   /**
    * Get the Class of the UDF.
    */
@@ -109,6 +129,9 @@ public class FunctionInfo {
     } else if (isGenericUDTF()) {
       return genericUDTF.getClass();
     }
+    if(isTableFunction()) {
+      return this.tableFunctionResolver;
+    }
     return null;
   }
 
@@ -131,6 +154,14 @@ public class FunctionInfo {
   }
 
   /**
+   * Internal table functions cannot be used in the language.
+   * {@link WindowingTableFunction}
+   */
+  public boolean isInternalTableFunction() {
+    return isInternalTableFunction;
+  }
+
+  /**
    * @return TRUE if the function is a GenericUDF
    */
   public boolean isGenericUDF() {
@@ -150,4 +181,11 @@ public class FunctionInfo {
   public boolean isGenericUDTF() {
     return null != genericUDTF;
   }
+
+  /**
+   * @return TRUE if the function is a Table Function
+   */
+  public boolean isTableFunction() {
+    return null != tableFunctionResolver;
+  }
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Wed May  8 04:43:16 2013
@@ -182,7 +182,6 @@ public final class FunctionRegistry {
   public static final String NOOP_TABLE_FUNCTION = "noop";
   public static final String NOOP_MAP_TABLE_FUNCTION = "noopwithmap";
 
-  static Map<String, PTFFunctionInfo> tableFunctions = Collections.synchronizedMap(new LinkedHashMap<String, PTFFunctionInfo>());
   static Map<String, WindowFunctionInfo> windowFunctions = Collections.synchronizedMap(new LinkedHashMap<String, WindowFunctionInfo>());
 
   /*
@@ -1294,6 +1293,9 @@ public final class FunctionRegistry {
       FunctionRegistry.registerTemporaryGenericUDAF(
         functionName, (GenericUDAFResolver)
         ReflectionUtils.newInstance(udfClass, null));
+    } else if(TableFunctionResolver.class.isAssignableFrom(udfClass)) {
+      FunctionRegistry.registerTableFunction(
+        functionName, (Class<? extends TableFunctionResolver>)udfClass);
     } else {
       return false;
     }
@@ -1406,14 +1408,17 @@ public final class FunctionRegistry {
 
   public static boolean isTableFunction(String name)
   {
-    PTFFunctionInfo tFInfo = tableFunctions.get(name.toLowerCase());
-     return tFInfo != null && !tFInfo.isInternal();
+    FunctionInfo tFInfo = mFunctions.get(name.toLowerCase());
+    return tFInfo != null && !tFInfo.isInternalTableFunction() && tFInfo.isTableFunction();
   }
 
   public static TableFunctionResolver getTableFunctionResolver(String name)
   {
-    PTFFunctionInfo tfInfo = tableFunctions.get(name.toLowerCase());
-    return (TableFunctionResolver) ReflectionUtils.newInstance(tfInfo.getFunctionResolver(), null);
+    FunctionInfo tfInfo = mFunctions.get(name.toLowerCase());
+    if(tfInfo.isTableFunction()) {
+      return (TableFunctionResolver) ReflectionUtils.newInstance(tfInfo.getFunctionClass(), null);
+    }
+    return null;
   }
 
   public static TableFunctionResolver getWindowingTableFunction()
@@ -1428,8 +1433,8 @@ public final class FunctionRegistry {
 
   public static void registerTableFunction(String name, Class<? extends TableFunctionResolver> tFnCls)
   {
-    PTFFunctionInfo tInfo = new PTFFunctionInfo(name, tFnCls);
-    tableFunctions.put(name.toLowerCase(), tInfo);
+    FunctionInfo tInfo = new FunctionInfo(name, tFnCls);
+    mFunctions.put(name.toLowerCase(), tInfo);
   }
 
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Wed May  8 04:43:16 2013
@@ -151,35 +151,13 @@ public class GroupByOperator extends Ope
   private List<FastBitSet> groupingSetsBitSet;
   transient private List<Object> newKeysGroupingSets;
 
-  /**
-   * This is used to store the position and field names for variable length
-   * fields.
-   **/
-  class varLenFields {
-    int aggrPos;
-    List<Field> fields;
-
-    varLenFields(int aggrPos, List<Field> fields) {
-      this.aggrPos = aggrPos;
-      this.fields = fields;
-    }
-
-    int getAggrPos() {
-      return aggrPos;
-    }
-
-    List<Field> getFields() {
-      return fields;
-    }
-  };
-
   // for these positions, some variable primitive type (String) is used, so size
   // cannot be estimated. sample it at runtime.
   transient List<Integer> keyPositionsSize;
 
   // for these positions, some variable primitive type (String) is used for the
   // aggregation classes
-  transient List<varLenFields> aggrPositions;
+  transient List<Field>[] aggrPositions;
 
   transient int fixedRowSize;
   transient long maxHashTblMemory;
@@ -383,7 +361,7 @@ public class GroupByOperator extends Ope
       aggregations = newAggregations();
       hashAggr = true;
       keyPositionsSize = new ArrayList<Integer>();
-      aggrPositions = new ArrayList<varLenFields>();
+      aggrPositions = new List[aggregations.length];
       groupbyMapAggrInterval = HiveConf.getIntVar(hconf,
           HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL);
 
@@ -523,21 +501,10 @@ public class GroupByOperator extends Ope
     }
 
     if (c.isInstance(new String()) || c.isInstance(new ByteArrayRef())) {
-      int idx = 0;
-      varLenFields v = null;
-      for (idx = 0; idx < aggrPositions.size(); idx++) {
-        v = aggrPositions.get(idx);
-        if (v.getAggrPos() == pos) {
-          break;
-        }
+      if (aggrPositions[pos] == null) {
+        aggrPositions[pos] = new ArrayList<Field>();
       }
-
-      if (idx == aggrPositions.size()) {
-        v = new varLenFields(pos, new ArrayList<Field>());
-        aggrPositions.add(v);
-      }
-
-      v.getFields().add(f);
+      aggrPositions[pos].add(f);
       return javaObjectOverHead;
     }
 
@@ -582,9 +549,11 @@ public class GroupByOperator extends Ope
     for (int i = 0; i < aggregationEvaluators.length; i++) {
 
       fixedRowSize += javaObjectOverHead;
-      Class<? extends AggregationBuffer> agg = aggregationEvaluators[i]
-          .getNewAggregationBuffer().getClass();
-      Field[] fArr = ObjectInspectorUtils.getDeclaredNonStaticFields(agg);
+      AggregationBuffer agg = aggregationEvaluators[i].getNewAggregationBuffer();
+      if (GenericUDAFEvaluator.isEstimable(agg)) {
+        continue;
+      }
+      Field[] fArr = ObjectInspectorUtils.getDeclaredNonStaticFields(agg.getClass());
       for (Field f : fArr) {
         fixedRowSize += getSize(i, f.getType(), f);
       }
@@ -968,29 +937,15 @@ public class GroupByOperator extends Ope
         }
       }
 
-      AggregationBuffer[] aggs = null;
-      if (aggrPositions.size() > 0) {
-        KeyWrapper newKeyProber = newKeys.copyKey();
-        aggs = hashAggregations.get(newKeyProber);
-      }
-
-      for (varLenFields v : aggrPositions) {
-        int aggrPos = v.getAggrPos();
-        List<Field> fieldsVarLen = v.getFields();
-        AggregationBuffer agg = aggs[aggrPos];
-
-        try {
-          for (Field f : fieldsVarLen) {
-            Object o = f.get(agg);
-            if (o instanceof String){
-              totalVariableSize += ((String)o).length();
-            }
-            else if (o instanceof ByteArrayRef){
-              totalVariableSize += ((ByteArrayRef)o).getData().length;
-            }
-          }
-        } catch (IllegalAccessException e) {
-          assert false;
+      AggregationBuffer[] aggs = hashAggregations.get(newKeys);
+      for (int i = 0; i < aggs.length; i++) {
+        AggregationBuffer agg = aggs[i];
+        if (GenericUDAFEvaluator.isEstimable(agg)) {
+          totalVariableSize += ((GenericUDAFEvaluator.AbstractAggregationBuffer)agg).estimate();
+          continue;
+        }
+        if (aggrPositions[i] != null) {
+          totalVariableSize += estimateSize(agg, aggrPositions[i]);
         }
       }
 
@@ -1010,6 +965,24 @@ public class GroupByOperator extends Ope
     return false;
   }
 
+  private int estimateSize(AggregationBuffer agg, List<Field> fields) {
+    int length = 0;
+    for (Field f : fields) {
+      try {
+        Object o = f.get(agg);
+        if (o instanceof String){
+          length += ((String)o).length();
+        }
+        else if (o instanceof ByteArrayRef){
+          length += ((ByteArrayRef)o).getData().length;
+        }
+      } catch (Exception e) {
+        // continue.. null out the field?
+      }
+    }
+    return length;
+  }
+
   private void flush(boolean complete) throws HiveException {
 
     countAfterReport = 0;

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Wed May  8 04:43:16 2013
@@ -70,7 +70,6 @@ public class HashTableSinkOperator exten
   protected transient List<ObjectInspector>[] joinKeysStandardObjectInspectors;
 
   protected transient int posBigTableAlias = -1; // one of the tables that is not in memory
-  transient int mapJoinRowsKey; // rows for a given key
 
   protected transient RowContainer<ArrayList<Object>> emptyList = null;
 

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed May  8 04:43:16 2013
@@ -121,6 +121,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
+import org.apache.hadoop.hive.ql.plan.api.Adjacency;
+import org.apache.hadoop.hive.ql.plan.api.Graph;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
@@ -229,6 +231,25 @@ public final class Utilities {
     }
   }
 
+  public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
+    try {
+      Graph stageGraph = plan.getQueryPlan().getStageGraph();
+      if (stageGraph == null)
+        return;
+      List<Adjacency> adjList = stageGraph.getAdjacencyList();
+      if (adjList == null)
+        return;
+      for (Adjacency adj : adjList) {
+        List<String> children = adj.getChildren();
+        if (children == null || children.isEmpty())
+          return;
+        conf.setStrings("mapreduce.workflow.adjacency."+adj.getNode(),
+            children.toArray(new String[children.size()]));
+      }
+    } catch (IOException e) {
+    }
+  }
+
   public static List<String> getFieldSchemaString(List<FieldSchema> fl) {
     if (fl == null) {
       return null;

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Wed May  8 04:43:16 2013
@@ -140,7 +140,200 @@ import org.apache.hadoop.util.Reflection
  * </ul>
  * </li>
  * </ul>
+ * <p>
+ * <pre>
+ * {@code
+ * The following is a pseudo-BNF grammar for RCFile. Comments are prefixed
+ * with dashes:
  *
+ * rcfile ::=
+ *   <file-header>
+ *   <rcfile-rowgroup>+
+ *
+ * file-header ::=
+ *   <file-version-header>
+ *   <file-key-class-name>              (only exists if version is seq6)
+ *   <file-value-class-name>            (only exists if version is seq6)
+ *   <file-is-compressed>
+ *   <file-is-block-compressed>         (only exists if version is seq6)
+ *   [<file-compression-codec-class>]
+ *   <file-header-metadata>
+ *   <file-sync-field>
+ *
+ * -- The normative RCFile implementation included with Hive is actually
+ * -- based on a modified version of Hadoop's SequenceFile code. Some
+ * -- things which should have been modified were not, including the code
+ * -- that writes out the file version header. Consequently, RCFile and
+ * -- SequenceFile originally shared the same version header.  A newer
+ * -- release has created a unique version string.
+ *
+ * file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
+ *                     |   Byte[4] {'R', 'C', 'F', 1}
+ *
+ * -- The name of the Java class responsible for reading the key buffer
+ * -- component of the rowgroup.
+ *
+ * file-key-class-name ::=
+ *   Text {"org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"}
+ *
+ * -- The name of the Java class responsible for reading the value buffer
+ * -- component of the rowgroup.
+ *
+ * file-value-class-name ::=
+ *   Text {"org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer"}
+ *
+ * -- Boolean variable indicating whether or not the file uses compression
+ * -- for the key and column buffer sections.
+ *
+ * file-is-compressed ::= Byte[1]
+ *
+ * -- A boolean field indicating whether or not the file is block compressed.
+ * -- This field is *always* false. According to comments in the original
+ * -- RCFile implementation this field was retained for backwards
+ * -- compatability with the SequenceFile format.
+ *
+ * file-is-block-compressed ::= Byte[1] {false}
+ *
+ * -- The Java class name of the compression codec iff <file-is-compressed>
+ * -- is true. The named class must implement
+ * -- org.apache.hadoop.io.compress.CompressionCodec.
+ * -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
+ *
+ * file-compression-codec-class ::= Text
+ *
+ * -- A collection of key-value pairs defining metadata values for the
+ * -- file. The Map is serialized using standard JDK serialization, i.e.
+ * -- an Int corresponding to the number of key-value pairs, followed by
+ * -- Text key and value pairs. The following metadata properties are
+ * -- mandatory for all RCFiles:
+ * --
+ * -- hive.io.rcfile.column.number: the number of columns in the RCFile
+ *
+ * file-header-metadata ::= Map<Text, Text>
+ *
+ * -- A 16 byte marker that is generated by the writer. This marker appears
+ * -- at regular intervals at the beginning of rowgroup-headers, and is
+ * -- intended to enable readers to skip over corrupted rowgroups.
+ *
+ * file-sync-hash ::= Byte[16]
+ *
+ * -- Each row group is split into three sections: a header, a set of
+ * -- key buffers, and a set of column buffers. The header section includes
+ * -- an optional sync hash, information about the size of the row group, and
+ * -- the total number of rows in the row group. Each key buffer
+ * -- consists of run-length encoding data which is used to decode
+ * -- the length and offsets of individual fields in the corresponding column
+ * -- buffer.
+ *
+ * rcfile-rowgroup ::=
+ *   <rowgroup-header>
+ *   <rowgroup-key-data>
+ *   <rowgroup-column-buffers>
+ *
+ * rowgroup-header ::=
+ *   [<rowgroup-sync-marker>, <rowgroup-sync-hash>]
+ *   <rowgroup-record-length>
+ *   <rowgroup-key-length>
+ *   <rowgroup-compressed-key-length>
+ *
+ * -- rowgroup-key-data is compressed if the column data is compressed.
+ * rowgroup-key-data ::=
+ *   <rowgroup-num-rows>
+ *   <rowgroup-key-buffers>
+ *
+ * -- An integer (always -1) signaling the beginning of a sync-hash
+ * -- field.
+ *
+ * rowgroup-sync-marker ::= Int
+ *
+ * -- A 16 byte sync field. This must match the <file-sync-hash> value read
+ * -- in the file header.
+ *
+ * rowgroup-sync-hash ::= Byte[16]
+ *
+ * -- The record-length is the sum of the number of bytes used to store
+ * -- the key and column parts, i.e. it is the total length of the current
+ * -- rowgroup.
+ *
+ * rowgroup-record-length ::= Int
+ *
+ * -- Total length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-key-length ::= Int
+ *
+ * -- Total compressed length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-compressed-key-length ::= Int
+ *
+ * -- Number of rows in the current rowgroup.
+ *
+ * rowgroup-num-rows ::= VInt
+ *
+ * -- One or more column key buffers corresponding to each column
+ * -- in the RCFile.
+ *
+ * rowgroup-key-buffers ::= <rowgroup-key-buffer>+
+ *
+ * -- Data in each column buffer is stored using a run-length
+ * -- encoding scheme that is intended to reduce the cost of
+ * -- repeated column field values. This mechanism is described
+ * -- in more detail in the following entries.
+ *
+ * rowgroup-key-buffer ::=
+ *   <column-buffer-length>
+ *   <column-buffer-uncompressed-length>
+ *   <column-key-buffer-length>
+ *   <column-key-buffer>
+ *
+ * -- The serialized length on disk of the corresponding column buffer.
+ *
+ * column-buffer-length ::= VInt
+ *
+ * -- The uncompressed length of the corresponding column buffer. This
+ * -- is equivalent to column-buffer-length if the RCFile is not compressed.
+ *
+ * column-buffer-uncompressed-length ::= VInt
+ *
+ * -- The length in bytes of the current column key buffer
+ *
+ * column-key-buffer-length ::= VInt
+ *
+ * -- The column-key-buffer contains a sequence of serialized VInt values
+ * -- corresponding to the byte lengths of the serialized column fields
+ * -- in the corresponding rowgroup-column-buffer. For example, consider
+ * -- an integer column that contains the consecutive values 1, 2, 3, 44.
+ * -- The RCFile format stores these values as strings in the column buffer,
+ * -- e.g. "12344". The length of each column field is recorded in
+ * -- the column-key-buffer as a sequence of VInts: 1,1,1,2. However,
+ * -- if the same length occurs repeatedly, then we replace repeated
+ * -- run lengths with the complement (i.e. negative) of the number of
+ * -- repetitions, so 1,1,1,2 becomes 1,~2,2.
+ *
+ * column-key-buffer ::= Byte[column-key-buffer-length]
+ *
+ * rowgroup-column-buffers ::= <rowgroup-value-buffer>+
+ *
+ * -- RCFile stores all column data as strings regardless of the
+ * -- underlying column type. The strings are neither length-prefixed or
+ * -- null-terminated, and decoding them into individual fields requires
+ * -- the use of the run-length information contained in the corresponding
+ * -- column-key-buffer.
+ *
+ * rowgroup-column-buffer ::= Byte[column-buffer-length]
+ *
+ * Byte ::= An eight-bit byte
+ *
+ * VInt ::= Variable length integer. The high-order bit of each byte
+ * indicates whether more bytes remain to be read. The low-order seven
+ * bits are appended as increasingly more significant bits in the
+ * resulting integer value.
+ *
+ * Int ::= A four-byte integer in big-endian format.
+ *
+ * Text ::= VInt, Chars (Length prefixed UTF-8 characters)
+ * }
+ * </pre>
+ * </p>
  */
 public class RCFile {
 
@@ -1385,21 +1578,31 @@ public class RCFile {
 
       try {
         seek(position + 4); // skip escape
-        in.readFully(syncCheck);
-        int syncLen = sync.length;
-        for (int i = 0; in.getPos() < end; i++) {
-          int j = 0;
-          for (; j < syncLen; j++) {
-            if (sync[j] != syncCheck[(i + j) % syncLen]) {
-              break;
+
+        int prefix = sync.length;
+        int n = conf.getInt("io.bytes.per.checksum", 512);
+        byte[] buffer = new byte[prefix+n];
+        n = (int)Math.min(n, end - in.getPos());
+        /* fill array with a pattern that will never match sync */
+        Arrays.fill(buffer, (byte)(~sync[0])); 
+        while(n > 0 && (in.getPos() + n) <= end) {
+          position = in.getPos();
+          in.readFully(buffer, prefix, n);
+          /* the buffer has n+sync bytes */
+          for(int i = 0; i < n; i++) {
+            int j;
+            for(j = 0; j < sync.length && sync[j] == buffer[i+j]; j++) {
+              /* nothing */
+            }
+            if(j == sync.length) {
+              /* simplified from (position + (i - prefix) + sync.length) - SYNC_SIZE */
+              in.seek(position + i - SYNC_SIZE);
+              return;
             }
           }
-          if (j == syncLen) {
-            in.seek(in.getPos() - SYNC_SIZE); // position before
-            // sync
-            return;
-          }
-          syncCheck[i % syncLen] = in.readByte();
+          /* move the last 16 bytes to the prefix area */
+          System.arraycopy(buffer, buffer.length - prefix - 1, buffer, 0, prefix);
+          n = (int)Math.min(n, end - in.getPos());
         }
       } catch (ChecksumException e) { // checksum failure
         handleChecksumException(e);

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java Wed May  8 04:43:16 2013
@@ -44,7 +44,7 @@ public class ColumnAccessAnalyzer {
     for (TableScanOperator op : topOps.keySet()) {
       Table table = topOps.get(op);
       String tableName = table.getCompleteName();
-      List<FieldSchema> tableCols = table.getAllCols();
+      List<FieldSchema> tableCols = table.getCols();
       for (int i : op.getNeededColumnIDs()) {
         columnAccessInfo.add(tableName, tableCols.get(i).getName());
       }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g Wed May  8 04:43:16 2013
@@ -172,7 +172,7 @@ tableSample
 tableSource
 @init { gParent.msgs.push("table source"); }
 @after { gParent.msgs.pop(); }
-    : tabname=tableName (ts=tableSample)? (alias=identifier)?
+    : tabname=tableName (ts=tableSample)? (KW_AS? alias=identifier)?
     -> ^(TOK_TABREF $tabname $ts? $alias?)
     ;
 

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Wed May  8 04:43:16 2013
@@ -1878,7 +1878,7 @@ destination
 @init { msgs.push("destination specification"); }
 @after { msgs.pop(); }
    :
-     KW_LOCAL KW_DIRECTORY StringLiteral -> ^(TOK_LOCAL_DIR StringLiteral)
+     KW_LOCAL KW_DIRECTORY StringLiteral tableRowFormat? tableFileFormat? -> ^(TOK_LOCAL_DIR StringLiteral tableRowFormat? tableFileFormat?)
    | KW_DIRECTORY StringLiteral -> ^(TOK_DIR StringLiteral)
    | KW_TABLE tableOrPartition -> tableOrPartition
    ;

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java Wed May  8 04:43:16 2013
@@ -51,6 +51,7 @@ public class QB {
   private boolean isQuery;
   private boolean isAnalyzeRewrite;
   private CreateTableDesc tblDesc = null; // table descriptor of the final
+  private CreateTableDesc localDirectoryDesc = null ;
 
   // used by PTFs
   /*
@@ -227,6 +228,14 @@ public class QB {
     tblDesc = desc;
   }
 
+  public CreateTableDesc getLLocalDirectoryDesc() {
+    return localDirectoryDesc;
+  }
+
+  public void setLocalDirectoryDesc(CreateTableDesc localDirectoryDesc) {
+    this.localDirectoryDesc = localDirectoryDesc;
+  }
+
   /**
    * Whether this QB is for a CREATE-TABLE-AS-SELECT.
    */

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed May  8 04:43:16 2013
@@ -1192,6 +1192,10 @@ public class SemanticAnalyzer extends Ba
         }
       }
 
+      RowFormatParams rowFormatParams = new RowFormatParams();
+      AnalyzeCreateCommonVars shared = new AnalyzeCreateCommonVars();
+      StorageFormat storageFormat = new StorageFormat();
+
       LOG.info("Get metadata for destination tables");
       // Go over all the destination structures and populate the related
       // metadata
@@ -1279,6 +1283,45 @@ public class SemanticAnalyzer extends Ba
           }
           qb.getMetaData().setDestForAlias(name, fname,
               (ast.getToken().getType() == HiveParser.TOK_DIR));
+
+          CreateTableDesc localDirectoryDesc = new CreateTableDesc();
+          boolean localDirectoryDescIsSet = false;
+          int numCh = ast.getChildCount();
+          for (int num = 1; num < numCh ; num++){
+            ASTNode child = (ASTNode) ast.getChild(num);
+            if (ast.getChild(num) != null){
+              switch (child.getToken().getType()) {
+                case HiveParser.TOK_TABLEROWFORMAT:
+                  rowFormatParams.analyzeRowFormat(shared, child);
+                  localDirectoryDesc.setFieldDelim(rowFormatParams.fieldDelim);
+                  localDirectoryDesc.setLineDelim(rowFormatParams.lineDelim);
+                  localDirectoryDesc.setCollItemDelim(rowFormatParams.collItemDelim);
+                  localDirectoryDesc.setMapKeyDelim(rowFormatParams.mapKeyDelim);
+                  localDirectoryDesc.setFieldEscape(rowFormatParams.fieldEscape);
+                  localDirectoryDescIsSet=true;
+                  break;
+                case HiveParser.TOK_TABLESERIALIZER:
+                  ASTNode serdeChild = (ASTNode) child.getChild(0);
+                  shared.serde = unescapeSQLString(serdeChild.getChild(0).getText());
+                  localDirectoryDesc.setSerName(shared.serde);
+                  localDirectoryDescIsSet=true;
+                  break;
+                case HiveParser.TOK_TBLSEQUENCEFILE:
+                case HiveParser.TOK_TBLTEXTFILE:
+                case HiveParser.TOK_TBLRCFILE:
+                case HiveParser.TOK_TBLORCFILE:
+                case HiveParser.TOK_TABLEFILEFORMAT:
+                  storageFormat.fillStorageFormat(child, shared);
+                  localDirectoryDesc.setOutputFormat(storageFormat.outputFormat);
+                  localDirectoryDesc.setSerName(shared.serde);
+                  localDirectoryDescIsSet=true;
+                  break;
+              }
+            }
+          }
+          if (localDirectoryDescIsSet){
+            qb.setLocalDirectoryDesc(localDirectoryDesc);
+          }
           break;
         }
         default:
@@ -5180,8 +5223,7 @@ public class SemanticAnalyzer extends Ba
           String fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
           table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat);
         } else {
-          table_desc = PlanUtils.getDefaultTableDesc(Integer
-              .toString(Utilities.ctrlaCode), cols, colTypes, false);
+          table_desc = PlanUtils.getDefaultTableDesc(qb.getLLocalDirectoryDesc(), cols, colTypes);
         }
       } else {
         table_desc = PlanUtils.getTableDesc(tblDesc, cols, colTypes);

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Wed May  8 04:43:16 2013
@@ -100,6 +100,56 @@ public final class PlanUtils {
     }
   }
 
+  public static TableDesc getDefaultTableDesc(CreateTableDesc localDirectoryDesc,
+      String cols, String colTypes ) {
+    TableDesc tableDesc = getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode), cols,
+        colTypes, false);;
+    if (localDirectoryDesc == null) {
+      return tableDesc;
+    }
+
+    try {
+      if (localDirectoryDesc.getFieldDelim() != null) {
+        tableDesc.getProperties().setProperty(
+            serdeConstants.FIELD_DELIM, localDirectoryDesc.getFieldDelim());
+        tableDesc.getProperties().setProperty(
+            serdeConstants.SERIALIZATION_FORMAT, localDirectoryDesc.getFieldDelim());
+      }
+      if (localDirectoryDesc.getLineDelim() != null) {
+        tableDesc.getProperties().setProperty(
+            serdeConstants.LINE_DELIM, localDirectoryDesc.getLineDelim());
+      }
+      if (localDirectoryDesc.getCollItemDelim() != null) {
+        tableDesc.getProperties().setProperty(
+            serdeConstants.COLLECTION_DELIM, localDirectoryDesc.getCollItemDelim());
+      }
+      if (localDirectoryDesc.getMapKeyDelim() != null) {
+        tableDesc.getProperties().setProperty(
+            serdeConstants.MAPKEY_DELIM, localDirectoryDesc.getMapKeyDelim());
+      }
+      if (localDirectoryDesc.getFieldEscape() !=null) {
+        tableDesc.getProperties().setProperty(
+            serdeConstants.ESCAPE_CHAR, localDirectoryDesc.getFieldEscape());
+      }
+      if (localDirectoryDesc.getSerName() != null) {
+        tableDesc.setSerdeClassName(localDirectoryDesc.getSerName());
+        tableDesc.getProperties().setProperty(
+            serdeConstants.SERIALIZATION_LIB, localDirectoryDesc.getSerName());
+        tableDesc.setDeserializerClass(
+            (Class<? extends Deserializer>) Class.forName(localDirectoryDesc.getSerName()));
+      }
+      if (localDirectoryDesc.getOutputFormat() != null){
+          tableDesc.setOutputFileFormatClass(Class.forName(localDirectoryDesc.getOutputFormat()));
+      }
+    } catch (ClassNotFoundException e) {
+      // mimicking behaviour in CreateTableDesc tableDesc creation
+      // returning null table description for output.
+      e.printStackTrace();
+      return null;
+    }
+    return tableDesc;
+  }
+
   /**
    * Generate the table descriptor of MetadataTypedColumnsetSerDe with the
    * separatorCode and column names (comma separated string).

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java Wed May  8 04:43:16 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.UD
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -303,6 +304,14 @@ public class GenericUDAFAverage extends 
       }
     }
 
+    @AggregationType(estimable = true)
+    static class AverageAgg extends AbstractAggregationBuffer {
+      long count;
+      double sum;
+      @Override
+      public int estimate() { return JavaDataModel.PRIMITIVES2 * 2; }
+    };
+
     @Override
     public void reset(AggregationBuffer aggregation) throws HiveException {
       doReset((AverageAggregationBuffer<TYPE>)aggregation);

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java Wed May  8 04:43:16 2013
@@ -147,7 +147,7 @@ public class GenericUDAFBridge extends A
     }
 
     /** class for storing UDAFEvaluator value. */
-    static class UDAFAgg implements AggregationBuffer {
+    static class UDAFAgg extends AbstractAggregationBuffer {
       UDAFEvaluator ueObject;
 
       UDAFAgg(UDAFEvaluator ueObject) {

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java Wed May  8 04:43:16 2013
@@ -99,7 +99,7 @@ public class GenericUDAFCollectSet exten
       }
     }
     
-    static class MkArrayAggregationBuffer implements AggregationBuffer {
+    static class MkArrayAggregationBuffer extends AbstractAggregationBuffer {
       Set<Object> container;
     }
     

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java Wed May  8 04:43:16 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.De
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -185,11 +186,17 @@ public class GenericUDAFComputeStats ext
             foi);
     }
 
-    public static class BooleanStatsAgg implements AggregationBuffer {
+    @AggregationType(estimable = true)
+    public static class BooleanStatsAgg extends AbstractAggregationBuffer {
       public String columnType;                        /* Datatype of column */
       public long countTrues;  /* Count of number of true values seen so far */
       public long countFalses; /* Count of number of false values seen so far */
       public long countNulls;  /* Count of number of null values seen so far */
+      @Override
+      public int estimate() {
+        JavaDataModel model = JavaDataModel.get();
+        return model.primitive2() * 3 + model.lengthFor(columnType);
+      }
     };
 
     @Override
@@ -426,7 +433,9 @@ public class GenericUDAFComputeStats ext
       }
     }
 
-    public static class LongStatsAgg implements AggregationBuffer {
+
+    @AggregationType(estimable = true)
+    public static class LongStatsAgg extends AbstractAggregationBuffer {
       public String columnType;
       public long min;                              /* Minimum value seen so far */
       public long max;                              /* Maximum value seen so far */
@@ -434,6 +443,12 @@ public class GenericUDAFComputeStats ext
       public LongNumDistinctValueEstimator numDV;    /* Distinct value estimator */
       public boolean firstItem;                     /* First item in the aggBuf? */
       public int numBitVectors;
+      @Override
+      public int estimate() {
+        JavaDataModel model = JavaDataModel.get();
+        return model.primitive1() * 2 + model.primitive2() * 3 +
+            model.lengthFor(columnType) + model.lengthFor(numDV);
+      }
     };
 
     @Override
@@ -738,7 +753,8 @@ public class GenericUDAFComputeStats ext
       }
     }
 
-    public static class DoubleStatsAgg implements AggregationBuffer {
+    @AggregationType(estimable = true)
+    public static class DoubleStatsAgg extends AbstractAggregationBuffer {
       public String columnType;
       public double min;                            /* Minimum value seen so far */
       public double max;                            /* Maximum value seen so far */
@@ -746,6 +762,12 @@ public class GenericUDAFComputeStats ext
       public DoubleNumDistinctValueEstimator numDV;  /* Distinct value estimator */
       public boolean firstItem;                     /* First item in the aggBuf? */
       public int numBitVectors;
+      @Override
+      public int estimate() {
+        JavaDataModel model = JavaDataModel.get();
+        return model.primitive1() * 2 + model.primitive2() * 3 +
+            model.lengthFor(columnType) + model.lengthFor(numDV);
+      }
     };
 
     @Override
@@ -1061,7 +1083,8 @@ public class GenericUDAFComputeStats ext
       }
     }
 
-    public static class StringStatsAgg implements AggregationBuffer {
+    @AggregationType(estimable = true)
+    public static class StringStatsAgg extends AbstractAggregationBuffer {
       public String columnType;
       public long maxLength;                           /* Maximum length seen so far */
       public long sumLength;             /* Sum of lengths of all values seen so far */
@@ -1070,6 +1093,12 @@ public class GenericUDAFComputeStats ext
       public StringNumDistinctValueEstimator numDV;      /* Distinct value estimator */
       public int numBitVectors;
       public boolean firstItem;
+      @Override
+      public int estimate() {
+        JavaDataModel model = JavaDataModel.get();
+        return model.primitive1() * 2 + model.primitive2() * 4 +
+            model.lengthFor(columnType) + model.lengthFor(numDV);
+      }
     };
 
     @Override
@@ -1377,12 +1406,18 @@ public class GenericUDAFComputeStats ext
       }
     }
 
-    public static class BinaryStatsAgg implements AggregationBuffer {
+    @AggregationType(estimable = true)
+    public static class BinaryStatsAgg extends AbstractAggregationBuffer {
       public String columnType;
       public long maxLength;                           /* Maximum length seen so far */
       public long sumLength;             /* Sum of lengths of all values seen so far */
       public long count;                          /* Count of all values seen so far */
       public long countNulls;          /* Count of number of null values seen so far */
+      @Override
+      public int estimate() {
+        JavaDataModel model = JavaDataModel.get();
+        return model.primitive2() * 4 + model.lengthFor(columnType);
+      }
     };
 
     @Override

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFContextNGrams.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFContextNGrams.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFContextNGrams.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFContextNGrams.java Wed May  8 04:43:16 2013
@@ -400,7 +400,7 @@ public class GenericUDAFContextNGrams im
 
 
     // Aggregation buffer methods. 
-    static class NGramAggBuf implements AggregationBuffer {
+    static class NGramAggBuf extends AbstractAggregationBuffer {
       ArrayList<String> context;
       NGramEstimator nge;
     };

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java?rev=1480158&r1=1480157&r2=1480158&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java Wed May  8 04:43:16 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.De
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -247,13 +248,16 @@ public class GenericUDAFCorrelation exte
       }
     }
 
-    static class StdAgg implements AggregationBuffer {
+    @AggregationType(estimable = true)
+    static class StdAgg extends AbstractAggregationBuffer {
       long count; // number n of elements
       double xavg; // average of x elements
       double yavg; // average of y elements
       double xvar; // n times the variance of x elements
       double yvar; // n times the variance of y elements
       double covar; // n times the covariance
+      @Override
+      public int estimate() { return JavaDataModel.PRIMITIVES2 * 6; }
     };
 
     @Override