You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/05/09 06:38:39 UTC

svn commit: r1480526 [1/2] - in /hive/branches/vectorization: ./ data/files/ hcatalog/build-support/ant/ hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/ hcatalog/src/test/ hcatalog/src/test/e2e/templeton/drivers/ hcatalog/src/tes...

Author: hashutosh
Date: Thu May  9 04:38:38 2013
New Revision: 1480526

URL: http://svn.apache.org/r1480526
Log:
Merged in with trunk. Resolved conflicts in DynamicByteArray.java

Added:
    hive/branches/vectorization/data/files/orc_create.txt
      - copied unchanged from r1480524, hive/trunk/data/files/orc_create.txt
    hive/branches/vectorization/ql/src/test/queries/clientpositive/ctas_colname.q
      - copied unchanged from r1480524, hive/trunk/ql/src/test/queries/clientpositive/ctas_colname.q
    hive/branches/vectorization/ql/src/test/results/clientpositive/ctas_colname.q.out
      - copied unchanged from r1480524, hive/trunk/ql/src/test/results/clientpositive/ctas_colname.q.out
Removed:
    hive/branches/vectorization/hcatalog/src/test/.gitignore
Modified:
    hive/branches/vectorization/   (props changed)
    hive/branches/vectorization/build-common.xml
    hive/branches/vectorization/build.properties
    hive/branches/vectorization/build.xml
    hive/branches/vectorization/hcatalog/build-support/ant/checkstyle.xml
    hive/branches/vectorization/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
    hive/branches/vectorization/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
    hive/branches/vectorization/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
    hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
    hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
    hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
    hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
    hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringRedBlackTree.java
    hive/branches/vectorization/ql/src/test/queries/clientpositive/column_access_stats.q
    hive/branches/vectorization/ql/src/test/queries/clientpositive/orc_create.q
    hive/branches/vectorization/ql/src/test/resources/orc-file-dump.out
    hive/branches/vectorization/ql/src/test/results/clientpositive/column_access_stats.q.out
    hive/branches/vectorization/ql/src/test/results/clientpositive/orc_create.q.out
    hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
    hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/operation/Operation.java
    hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java

Propchange: hive/branches/vectorization/
------------------------------------------------------------------------------
  Merged /hive/branches/branch-0.11:r1480385,1480458
  Merged /hive/trunk:r1477794-1480524

Modified: hive/branches/vectorization/build-common.xml
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/build-common.xml?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/build-common.xml (original)
+++ hive/branches/vectorization/build-common.xml Thu May  9 04:38:38 2013
@@ -70,6 +70,10 @@
   <property name="hadoop.opts.23" value="-D mapreduce.framework.name=local" />
   <property name="hadoop.opts.20" value="" />
 
+  <condition property="test.halt.on.failure" value="no" else="yes">
+    <equals arg1="${test.continue.on.failure}" arg2="true"/>
+  </condition>
+
   <target name="set-test-classpath">
     <typedef name="distinctelementsclasspath" classname="org.apache.hadoop.hive.ant.DistinctElementsClassPath"
       classpath="${build.dir.hive}/anttasks/hive-anttasks-${version}.jar:${build.ivy.lib.dir}/default/commons-collections-${commons-collections.version}.jar:${build.ivy.lib.dir}/default/commons-lang-${commons-lang.version}.jar"/>
@@ -429,7 +433,8 @@
         <echo message="Test Classpath: ${hadoop.testcp}"/>
       </then>
     </if>
-    <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
+
+    <junit showoutput="${test.output}" printsummary="yes" haltonfailure="${test.halt.on.failure}"
            fork="yes" maxmemory="${test.junit.maxmemory}" dir="${basedir}" timeout="${test.junit.timeout}"
            errorProperty="tests.failed" failureProperty="tests.failed" filtertrace="off">
       <jvmarg value="-XX:+HeapDumpOnOutOfMemoryError"/>

Modified: hive/branches/vectorization/build.properties
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/build.properties?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/build.properties (original)
+++ hive/branches/vectorization/build.properties Thu May  9 04:38:38 2013
@@ -75,7 +75,7 @@ common.jar=${hadoop.root}/lib/commons-ht
 # module names needed for build process
 iterate.hive.all=ant,shims,common,serde,metastore,ql,contrib,service,cli,jdbc,beeline,hwi,hbase-handler,testutils,hcatalog
 iterate.hive.modules=shims,common,serde,metastore,ql,contrib,service,cli,jdbc,beeline,hwi,hbase-handler,testutils,hcatalog
-iterate.hive.tests=ql,contrib,hbase-handler,hwi,jdbc,metastore,odbc,serde,service,hcatalog
+iterate.hive.tests=ql,contrib,hbase-handler,hwi,jdbc,beeline,metastore,odbc,serde,service,hcatalog
 iterate.hive.thrift=ql,service,metastore,serde
 iterate.hive.protobuf=ql
 iterate.hive.cpp=odbc
@@ -95,7 +95,7 @@ test.junit.timeout=43200000
 # ant test -Dtest.junit.exclude="**/Test*CliDriver.class,**/TestPartitions.class"
 test.junit.exclude=
 
-test.continue.on.failure=false
+test.continue.on.failure=true
 
 test.submodule.exclude=
 test.junit.maxmemory=512m
@@ -129,10 +129,6 @@ mvn.pom.dir=${build.dir.hive}/maven/poms
 mvn.license.dir=${build.dir.hive}/maven/licenses
 mvn.deploy.id=apache.snapshots.https
 mvn.deploy.url=https://repository.apache.org/content/repositories/snapshots
-#
-# unit test Properties
-#
-failonerror=false
 
 #
 # Data nucleus repository - needed for jdo2-api-2.3-ec.jar download

Modified: hive/branches/vectorization/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/build.xml?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/build.xml (original)
+++ hive/branches/vectorization/build.xml Thu May  9 04:38:38 2013
@@ -70,6 +70,10 @@
     <equals arg1="${mvn.publish.repo}" arg2="staging"/>
   </condition>
 
+  <condition property="test.halt.on.failure" value="no" else="yes">
+    <equals arg1="${test.continue.on.failure}" arg2="true"/>
+  </condition>
+
   <taskdef resource="net/sf/antcontrib/antcontrib.properties">
     <classpath>
       <pathelement location="${hive.root}/testlibs/ant-contrib-1.0b3.jar"/>
@@ -293,7 +297,7 @@
     <condition property="target.module" value="${module}" else="${iterate.hive.test}">
       <isset property="module"/>
     </condition>
-    <for list="${target.module}" param="module">
+    <for keepgoing="${test.continue.on.failure}" list="${target.module}" param="module">
       <sequential>
         <ant antfile="@{module}/build.xml" target="test" inheritAll="false" inheritRefs="true">
           <property name="build.dir.hive" location="${build.dir.hive}"/>
@@ -305,7 +309,7 @@
 
   <target name="test-shims">
     <echo message="Project: ${ant.project.name}"/>
-    <subant target="test" failonerror="${failonerror}">
+    <subant target="test" failonerror="${test.halt.on.failure}">
       <property name="hadoop.version" value="${hadoop.security.version}"/>
       <property name="hadoop.security.version" value="${hadoop.security.version}"/>
       <fileset dir="${hive.root}/shims" includes="build.xml"/>
@@ -506,8 +510,8 @@
     <!-- Package the hcat stuff and pull it up into Hive's build dir -->
     <ant antfile="${hive.root}/hcatalog/build.xml" target="package"
         inheritAll="false"/>
-    <mkdir dir="${build.dir.hive}/hcatalog"/>
-    <copy todir="${build.dir.hive}/hcatalog">
+    <mkdir dir="${target.dir}/hcatalog"/>
+    <copy todir="${target.dir}/hcatalog">
         <fileset dir="${hive.root}/hcatalog/build/hcatalog-${hcatalog.version}"/>
     </copy>
   </target>
@@ -711,6 +715,7 @@
         <tarfileset dir="${hive.root}" mode="664" prefix="${tar.final.name}/src"
                     excludes="${vcs.excludes}">
           <exclude name="build/**" />
+          <exclude name="hcatalog/**/build/**" />
           <exclude name="bin/**" />
           <exclude name="**/py/**/*-remote" />
           <exclude name="data/scripts/**" />
@@ -720,6 +725,7 @@
         <tarfileset dir="${hive.root}" mode="755" prefix="${tar.final.name}/src"
                     excludes="${vcs.excludes}">
           <exclude name="build/**" />
+          <exclude name="hcatalog/**/build/**" />
           <include name="bin/**" />
           <include name="**/py/**/*-remote" />
           <include name="data/scripts/**" />
@@ -750,9 +756,6 @@
           <exclude name="docs/**"/>
           <exclude name="lib/py/**/*-remote"/>
         </tarfileset>
-        <tarfileset dir="${build.dir.hive}/hcatalog" mode="755" prefix="${bin.final.name}"
-                    excludes="${vcs.excludes}">
-        </tarfileset>
       </param.listofitems>
     </macro_tar>
   </target>

Modified: hive/branches/vectorization/hcatalog/build-support/ant/checkstyle.xml
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/build-support/ant/checkstyle.xml?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/build-support/ant/checkstyle.xml (original)
+++ hive/branches/vectorization/hcatalog/build-support/ant/checkstyle.xml Thu May  9 04:38:38 2013
@@ -31,6 +31,7 @@
     <checkstyle classpathref="checkstyle.class.path"
                 config="${path.to.basedir}/build-support/checkstyle/coding_style.xml">
       <fileset dir="${basedir}">
+          <exclude name="**/.*"/>
           <exclude name="**/build/**"/>
           <exclude name=".idea/**"/>
           <exclude name="historical/**"/>

Modified: hive/branches/vectorization/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ hive/branches/vectorization/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java Thu May  9 04:38:38 2013
@@ -24,9 +24,9 @@ import java.util.HashMap;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
@@ -76,17 +76,42 @@ import org.slf4j.LoggerFactory;
 public class NotificationListener extends MetaStoreEventListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
-    protected Session session;
     protected Connection conn;
     private static MessageFactory messageFactory = MessageFactory.getInstance();
+    public static final int NUM_RETRIES = 1;
+    private static final String HEALTH_CHECK_TOPIC_SUFFIX = "jms_health_check";
+    private static final String HEALTH_CHECK_MSG = "HCAT_JMS_HEALTH_CHECK_MESSAGE";
+
+    protected final ThreadLocal<Session> session = new ThreadLocal<Session>() {
+        @Override
+        protected Session initialValue() {
+            try {
+                return createSession();
+            } catch (Exception e) {
+                LOG.error("Couldn't create JMS Session", e);
+                return null;
+            }
+        }
+
+        @Override
+        public void remove() {
+            if (get() != null) {
+                try {
+                    get().close();
+                } catch (Exception e) {
+                    LOG.error("Unable to close bad JMS session, ignored error", e);
+                }
+            }
+            super.remove();
+        }
+    };
 
     /**
      * Create message bus connection and session in constructor.
      */
     public NotificationListener(final Configuration conf) {
-
         super(conf);
-        createConnection();
+        testAndCreateConnection();
     }
 
     private static String getTopicName(Partition partition,
@@ -178,7 +203,7 @@ public class NotificationListener extend
         // Subscriber can get notification about drop of a database in HCAT
         // by listening on a topic named "HCAT" and message selector string
         // as "HCAT_EVENT = HCAT_DROP_DATABASE"
-        if (dbEvent.getStatus())  {
+        if (dbEvent.getStatus()) {
             String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
             send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName);
         }
@@ -216,7 +241,7 @@ public class NotificationListener extend
         }
     }
 
-    private String getTopicPrefix(HiveConf conf) {
+    private String getTopicPrefix(Configuration conf) {
         return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
             HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
     }
@@ -253,86 +278,102 @@ public class NotificationListener extend
      * @param topicName is the name on message broker on which message is sent.
      */
     protected void send(HCatEventMessage hCatEventMessage, String topicName) {
-        try {
-            if(null == session){
-                // this will happen, if we never able to establish a connection.
-                createConnection();
-                if (null == session){
-                    // Still not successful, return from here.
-                    LOG.error("Invalid session. Failed to send message on topic: " +
-                            topicName + " event: " + hCatEventMessage.getEventType());
-                    return;
-                }
-            }
-
-            Destination topic = getTopic(topicName);
+        send(hCatEventMessage, topicName, NUM_RETRIES);
+    }
 
-            if (null == topic){
-                // Still not successful, return from here.
-                LOG.error("Invalid session. Failed to send message on topic: " +
-                        topicName + " event: " + hCatEventMessage.getEventType());
-                return;
+    /**
+     * @param hCatEventMessage The HCatEventMessage being sent over JMS, this method is threadsafe
+     * @param topicName is the name on message broker on which message is sent.
+     * @param retries the number of retry attempts
+     */
+    protected void send(HCatEventMessage hCatEventMessage, String topicName, int retries) {
+        try {
+            if (session.get() == null) {
+                // Need to reconnect
+                throw new JMSException("Invalid JMS session");
             }
-
-            MessageProducer producer = session.createProducer(topic);
-            Message msg = session.createTextMessage(hCatEventMessage.toString());
+            Destination topic = createTopic(topicName);
+            Message msg = session.get().createTextMessage(hCatEventMessage.toString());
 
             msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString());
             msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion());
             msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat());
+            MessageProducer producer = createProducer(topic);
             producer.send(msg);
             // Message must be transacted before we return.
-            session.commit();
-        }
-        catch(Exception e){
-            // Gobble up the exception. Message delivery is best effort.
-            LOG.error("Failed to send message on topic: " + topicName +
-                    " event: " + hCatEventMessage.getEventType(), e);
+            session.get().commit();
+        } catch (Exception e) {
+            if (retries >= 0) {
+                // this may happen if we were able to establish connection once, but its no longer valid
+                LOG.error("Seems like connection is lost. Will retry. Retries left : " + retries + ". error was:", e);
+                testAndCreateConnection();
+                send(hCatEventMessage, topicName, retries - 1);
+            } else {
+                // Gobble up the exception. Message delivery is best effort.
+                LOG.error("Failed to send message on topic: " + topicName +
+                    " event: " + hCatEventMessage.getEventType() + " after retries: " + NUM_RETRIES, e);
+            }
         }
     }
 
     /**
-     * Get the topic object for the topicName, it also tries to reconnect
-     * if the connection appears to be broken.
+     * Get the topic object for the topicName
      *
      * @param topicName The String identifying the message-topic.
      * @return A {@link Topic} object corresponding to the specified topicName.
      * @throws JMSException
      */
-    protected Topic getTopic(final String topicName) throws JMSException {
-        Topic topic;
+    protected Topic createTopic(final String topicName) throws JMSException {
+        return session.get().createTopic(topicName);
+    }
+
+    /**
+     * Does a health check on the connection by sending a dummy message.
+     * Create the connection if the connection is found to be bad
+     * Also recreates the session
+     */
+    protected synchronized void testAndCreateConnection() {
+        if (conn != null) {
+            // This method is reached when error occurs while sending msg, so the session must be bad
+            session.remove();
+            if (!isConnectionHealthy()) {
+                // I am the first thread to detect the error, cleanup old connection & reconnect
+                try {
+                    conn.close();
+                } catch (Exception e) {
+                    LOG.error("Unable to close bad JMS connection, ignored error", e);
+                }
+                conn = createConnection();
+            }
+        } else {
+            conn = createConnection();
+        }
         try {
-            // Topics are created on demand. If it doesn't exist on broker it will
-            // be created when broker receives this message.
-            topic = session.createTopic(topicName);
-        } catch (IllegalStateException ise) {
-            // this will happen if we were able to establish connection once, but its no longer valid,
-            // ise is thrown, catch it and retry.
-            LOG.error("Seems like connection is lost. Retrying", ise);
-            createConnection();
-            topic = session.createTopic(topicName);
+            session.set(createSession());
+        } catch (JMSException e) {
+            LOG.error("Couldn't create JMS session, ignored the error", e);
         }
-        return topic;
     }
 
-    protected void createConnection() {
-
+    /**
+     * Create the JMS connection
+     * @return newly created JMS connection
+     */
+    protected Connection createConnection() {
+        LOG.info("Will create new JMS connection");
         Context jndiCntxt;
+        Connection jmsConnection = null;
         try {
             jndiCntxt = new InitialContext();
-            ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
-                .lookup("ConnectionFactory");
-            Connection conn = connFac.createConnection();
-            conn.start();
-            conn.setExceptionListener(new ExceptionListener() {
+            ConnectionFactory connFac = (ConnectionFactory) jndiCntxt.lookup("ConnectionFactory");
+            jmsConnection = connFac.createConnection();
+            jmsConnection.start();
+            jmsConnection.setExceptionListener(new ExceptionListener() {
                 @Override
                 public void onException(JMSException jmse) {
-                    LOG.error(jmse.toString());
+                    LOG.error("JMS Exception listener received exception. Ignored the error", jmse);
                 }
             });
-            // We want message to be sent when session commits, thus we run in
-            // transacted mode.
-            session = conn.createSession(true, Session.SESSION_TRANSACTED);
         } catch (NamingException e) {
             LOG.error("JNDI error while setting up Message Bus connection. "
                 + "Please make sure file named 'jndi.properties' is in "
@@ -342,20 +383,54 @@ public class NotificationListener extend
         } catch (Throwable t) {
             LOG.error("Unable to connect to JMS provider", t);
         }
+        return jmsConnection;
+    }
+
+    /**
+     * Send a dummy message to probe if the JMS connection is healthy
+     * @return true if connection is healthy, false otherwise
+     */
+    protected boolean isConnectionHealthy() {
+        try {
+            Topic topic = createTopic(getTopicPrefix(getConf()) + "." + HEALTH_CHECK_TOPIC_SUFFIX);
+            MessageProducer producer = createProducer(topic);
+            Message msg = session.get().createTextMessage(HEALTH_CHECK_MSG);
+            producer.send(msg, DeliveryMode.NON_PERSISTENT, 4, 0);
+        } catch (Exception e) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Creates a JMS session
+     * @return newly create JMS session
+     * @throws JMSException
+     */
+    protected Session createSession() throws JMSException {
+        // We want message to be sent when session commits, thus we run in
+        // transacted mode.
+        return conn.createSession(true, Session.SESSION_TRANSACTED);
+    }
+
+    /**
+     * Create a JMS producer
+     * @param topic
+     * @return newly created message producer
+     * @throws JMSException
+     */
+    protected MessageProducer createProducer(Destination topic) throws JMSException {
+        return session.get().createProducer(topic);
     }
 
     @Override
     protected void finalize() throws Throwable {
-        // Close the connection before dying.
-        try {
-            if (null != session)
-                session.close();
-            if (conn != null) {
+        if (conn != null) {
+            try {
                 conn.close();
+            } catch (Exception e) {
+                LOG.error("Couldn't close jms connection, ignored the error", e);
             }
-
-        } catch (Exception ignore) {
-            LOG.info("Failed to close message bus connection.", ignore);
         }
     }
 

Modified: hive/branches/vectorization/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm (original)
+++ hive/branches/vectorization/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm Thu May  9 04:38:38 2013
@@ -699,7 +699,9 @@ sub compare
     }
 
     
-    if (defined $testCmd->{'check_job_created'} || defined  $testCmd->{'check_job_complete'}) {
+    if ( (defined $testCmd->{'check_job_created'})
+         || (defined $testCmd->{'check_job_complete'})
+         || (defined $testCmd->{'check_job_exit_value'}) ) {
       my $jobid = $json_hash->{'id'};
       if (!defined $jobid) {
         print $log "$0::$subName INFO check failed: " 
@@ -714,7 +716,7 @@ sub compare
             . "jobresult not defined ";
           $result = 0;
         }
-        if ($testCmd->{'check_job_complete'}) {
+        if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'})) {
           my $jobComplete;
           my $NUM_RETRIES = 60;
           my $SLEEP_BETWEEN_RETRIES = 5;
@@ -736,11 +738,21 @@ sub compare
             $result = 0;
           } else { 
             # job has completed, check the runState value
-            my $runState = $res_hash->{'status'}->{'runState'};
-            my $runStateVal = $self->getRunStateNum($testCmd->{'check_job_complete'});
-            if ( (!defined $runState) || $runState ne $runStateVal) {
-              print $log "check_job_complete failed. got runState  $runState,  expected  $runStateVal";
-              $result = 0;
+            if (defined($testCmd->{'check_job_complete'})) {
+              my $runState = $res_hash->{'status'}->{'runState'};
+              my $runStateVal = $self->getRunStateNum($testCmd->{'check_job_complete'});
+              if ( (!defined $runState) || $runState ne $runStateVal) {
+                print $log "check_job_complete failed. got runState  $runState,  expected  $runStateVal";
+                $result = 0;
+              }
+            }
+            if (defined($testCmd->{'check_job_exit_value'})) {
+              my $exitValue = $res_hash->{'exitValue'};
+              my $expectedExitValue = $testCmd->{'check_job_exit_value'};
+              if ( (!defined $exitValue) || $exitValue ne $expectedExitValue) {
+                print $log "check_job_exit_value failed. got exitValue $exitValue,  expected  $expectedExitValue";
+                $result = 0;
+              }
             }
           }
         }

Modified: hive/branches/vectorization/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf (original)
+++ hive/branches/vectorization/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf Thu May  9 04:38:38 2013
@@ -49,6 +49,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS',
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
     {
@@ -107,6 +108,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
    ]
@@ -128,6 +130,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'FAILURE', 
+     'check_job_exit_value' => 8,
      'check_call_back' => 1,
     },
     {
@@ -140,7 +143,9 @@ $cfg = 
                                 #results
      'status_code' => 200,
      'check_job_created' => 1,
+     'check_job_exit_value' => 0,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
     {
@@ -154,6 +159,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
     {
@@ -167,6 +173,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -181,6 +188,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -197,6 +205,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -212,6 +221,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -257,6 +267,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'FAILURE', 
+     'check_job_exit_value' => 11,
 
     },
  
@@ -271,6 +282,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -285,6 +297,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
     {
@@ -298,6 +311,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS',
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -312,6 +326,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -326,6 +341,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
 
@@ -340,6 +356,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
      'check_call_back' => 1,
 
     },
@@ -356,6 +373,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_exit_value' => 0,
 
     },
 

Modified: hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java (original)
+++ hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java Thu May  9 04:38:38 2013
@@ -64,11 +64,14 @@ public class HiveAlterHandler implements
       throw new InvalidOperationException("New table is invalid: " + newt);
     }
 
-    if (!MetaStoreUtils.validateName(newt.getTableName())
-        || !MetaStoreUtils.validateTblColumns(newt.getSd().getCols())) {
+    if (!MetaStoreUtils.validateName(newt.getTableName())) {
       throw new InvalidOperationException(newt.getTableName()
           + " is not a valid object name");
     }
+    String validate = MetaStoreUtils.validateTblColumns(newt.getSd().getCols());
+    if (validate != null) {
+      throw new InvalidOperationException("Invalid column " + validate);
+    }
 
     Path srcPath = null;
     FileSystem srcFs = null;

Modified: hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Thu May  9 04:38:38 2013
@@ -80,6 +80,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
 import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo;
 import org.apache.hadoop.hive.metastore.api.Role;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
 import org.apache.hadoop.hive.metastore.api.Type;
@@ -1003,20 +1004,32 @@ public class HiveMetaStore extends Thrif
         throws AlreadyExistsException, MetaException,
         InvalidObjectException, NoSuchObjectException {
 
-      if (!MetaStoreUtils.validateName(tbl.getTableName())
-          || !MetaStoreUtils.validateTblColumns(tbl.getSd().getCols())
-          || (tbl.getPartitionKeys() != null && !MetaStoreUtils
-              .validateTblColumns(tbl.getPartitionKeys()))
-          || !MetaStoreUtils.validateSkewedColNames(
-              (null == tbl.getSd().getSkewedInfo()) ?
-                  null : tbl.getSd().getSkewedInfo().getSkewedColNames())
-          || !MetaStoreUtils.validateSkewedColNamesSubsetCol(
-              (null == tbl.getSd().getSkewedInfo()) ?
-                  null : tbl.getSd().getSkewedInfo().getSkewedColNames(),
-              tbl.getSd().getCols())) {
+      if (!MetaStoreUtils.validateName(tbl.getTableName())) {
         throw new InvalidObjectException(tbl.getTableName()
             + " is not a valid object name");
       }
+      String validate = MetaStoreUtils.validateTblColumns(tbl.getSd().getCols());
+      if (validate != null) {
+        throw new InvalidObjectException("Invalid column " + validate);
+      }
+      if (tbl.getPartitionKeys() != null) {
+        validate = MetaStoreUtils.validateTblColumns(tbl.getPartitionKeys());
+        if (validate != null) {
+          throw new InvalidObjectException("Invalid partition column " + validate);
+        }
+      }
+      SkewedInfo skew = tbl.getSd().getSkewedInfo();
+      if (skew != null) {
+        validate = MetaStoreUtils.validateSkewedColNames(skew.getSkewedColNames());
+        if (validate != null) {
+          throw new InvalidObjectException("Invalid skew column " + validate);
+        }
+        validate = MetaStoreUtils.validateSkewedColNamesSubsetCol(
+            skew.getSkewedColNames(), tbl.getSd().getCols());
+        if (validate != null) {
+          throw new InvalidObjectException("Invalid skew column " + validate);
+        }
+      }
 
       Path tblPath = null;
       boolean success = false, madeDir = false;

Modified: hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (original)
+++ hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java Thu May  9 04:38:38 2013
@@ -352,16 +352,16 @@ public class MetaStoreUtils {
     return false;
   }
 
-  static public boolean validateTblColumns(List<FieldSchema> cols) {
+  static public String validateTblColumns(List<FieldSchema> cols) {
     for (FieldSchema fieldSchema : cols) {
       if (!validateName(fieldSchema.getName())) {
-        return false;
+        return "name: " + fieldSchema.getName();
       }
       if (!validateColumnType(fieldSchema.getType())) {
-        return false;
+        return "type: " + fieldSchema.getType();
       }
     }
-    return true;
+    return null;
   }
 
   static void throwExceptionIfIncompatibleColTypeChange(
@@ -434,22 +434,22 @@ public class MetaStoreUtils {
     return true;
   }
 
-  public static  boolean validateSkewedColNames(List<String> cols) {
+  public static String validateSkewedColNames(List<String> cols) {
     if (null == cols) {
-      return true;
+      return null;
     }
     for (String col : cols) {
       if (!validateName(col)) {
-        return false;
+        return col;
       }
     }
-    return true;
+    return null;
   }
 
-  public static boolean validateSkewedColNamesSubsetCol(List<String> skewedColNames,
+  public static String validateSkewedColNamesSubsetCol(List<String> skewedColNames,
       List<FieldSchema> cols) {
     if (null == skewedColNames) {
-      return true;
+      return null;
     }
     List<String> colNames = new ArrayList<String>();
     for (FieldSchema fieldSchema : cols) {
@@ -459,7 +459,10 @@ public class MetaStoreUtils {
     List<String> copySkewedColNames = new ArrayList<String>(skewedColNames);
     // remove valid columns
     copySkewedColNames.removeAll(colNames);
-    return (copySkewedColNames.size() > 0) ? false : true;
+    if (copySkewedColNames.isEmpty()) {
+      return null;
+    }
+    return copySkewedColNames.toString();
   }
 
   public static String getListType(String t) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Thu May  9 04:38:38 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.QueryPl
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Group;
@@ -532,11 +533,14 @@ public class HiveHistory {
     return null;
 
   }
+
+  public void closeStream() {
+    IOUtils.cleanup(LOG, histStream);
+  }
+
   @Override
   public void finalize() throws Throwable {
-    if (histStream !=null){
-      histStream.close();
-    }
+    closeStream();
     super.finalize();
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Thu May  9 04:38:38 2013
@@ -140,7 +140,200 @@ import org.apache.hadoop.util.Reflection
  * </ul>
  * </li>
  * </ul>
+ * <p>
+ * <pre>
+ * {@code
+ * The following is a pseudo-BNF grammar for RCFile. Comments are prefixed
+ * with dashes:
  *
+ * rcfile ::=
+ *   <file-header>
+ *   <rcfile-rowgroup>+
+ *
+ * file-header ::=
+ *   <file-version-header>
+ *   <file-key-class-name>              (only exists if version is seq6)
+ *   <file-value-class-name>            (only exists if version is seq6)
+ *   <file-is-compressed>
+ *   <file-is-block-compressed>         (only exists if version is seq6)
+ *   [<file-compression-codec-class>]
+ *   <file-header-metadata>
+ *   <file-sync-field>
+ *
+ * -- The normative RCFile implementation included with Hive is actually
+ * -- based on a modified version of Hadoop's SequenceFile code. Some
+ * -- things which should have been modified were not, including the code
+ * -- that writes out the file version header. Consequently, RCFile and
+ * -- SequenceFile originally shared the same version header.  A newer
+ * -- release has created a unique version string.
+ *
+ * file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
+ *                     |   Byte[4] {'R', 'C', 'F', 1}
+ *
+ * -- The name of the Java class responsible for reading the key buffer
+ * -- component of the rowgroup.
+ *
+ * file-key-class-name ::=
+ *   Text {"org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"}
+ *
+ * -- The name of the Java class responsible for reading the value buffer
+ * -- component of the rowgroup.
+ *
+ * file-value-class-name ::=
+ *   Text {"org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer"}
+ *
+ * -- Boolean variable indicating whether or not the file uses compression
+ * -- for the key and column buffer sections.
+ *
+ * file-is-compressed ::= Byte[1]
+ *
+ * -- A boolean field indicating whether or not the file is block compressed.
+ * -- This field is *always* false. According to comments in the original
+ * -- RCFile implementation this field was retained for backwards
+ * -- compatability with the SequenceFile format.
+ *
+ * file-is-block-compressed ::= Byte[1] {false}
+ *
+ * -- The Java class name of the compression codec iff <file-is-compressed>
+ * -- is true. The named class must implement
+ * -- org.apache.hadoop.io.compress.CompressionCodec.
+ * -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
+ *
+ * file-compression-codec-class ::= Text
+ *
+ * -- A collection of key-value pairs defining metadata values for the
+ * -- file. The Map is serialized using standard JDK serialization, i.e.
+ * -- an Int corresponding to the number of key-value pairs, followed by
+ * -- Text key and value pairs. The following metadata properties are
+ * -- mandatory for all RCFiles:
+ * --
+ * -- hive.io.rcfile.column.number: the number of columns in the RCFile
+ *
+ * file-header-metadata ::= Map<Text, Text>
+ *
+ * -- A 16 byte marker that is generated by the writer. This marker appears
+ * -- at regular intervals at the beginning of rowgroup-headers, and is
+ * -- intended to enable readers to skip over corrupted rowgroups.
+ *
+ * file-sync-hash ::= Byte[16]
+ *
+ * -- Each row group is split into three sections: a header, a set of
+ * -- key buffers, and a set of column buffers. The header section includes
+ * -- an optional sync hash, information about the size of the row group, and
+ * -- the total number of rows in the row group. Each key buffer
+ * -- consists of run-length encoding data which is used to decode
+ * -- the length and offsets of individual fields in the corresponding column
+ * -- buffer.
+ *
+ * rcfile-rowgroup ::=
+ *   <rowgroup-header>
+ *   <rowgroup-key-data>
+ *   <rowgroup-column-buffers>
+ *
+ * rowgroup-header ::=
+ *   [<rowgroup-sync-marker>, <rowgroup-sync-hash>]
+ *   <rowgroup-record-length>
+ *   <rowgroup-key-length>
+ *   <rowgroup-compressed-key-length>
+ *
+ * -- rowgroup-key-data is compressed if the column data is compressed.
+ * rowgroup-key-data ::=
+ *   <rowgroup-num-rows>
+ *   <rowgroup-key-buffers>
+ *
+ * -- An integer (always -1) signaling the beginning of a sync-hash
+ * -- field.
+ *
+ * rowgroup-sync-marker ::= Int
+ *
+ * -- A 16 byte sync field. This must match the <file-sync-hash> value read
+ * -- in the file header.
+ *
+ * rowgroup-sync-hash ::= Byte[16]
+ *
+ * -- The record-length is the sum of the number of bytes used to store
+ * -- the key and column parts, i.e. it is the total length of the current
+ * -- rowgroup.
+ *
+ * rowgroup-record-length ::= Int
+ *
+ * -- Total length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-key-length ::= Int
+ *
+ * -- Total compressed length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-compressed-key-length ::= Int
+ *
+ * -- Number of rows in the current rowgroup.
+ *
+ * rowgroup-num-rows ::= VInt
+ *
+ * -- One or more column key buffers corresponding to each column
+ * -- in the RCFile.
+ *
+ * rowgroup-key-buffers ::= <rowgroup-key-buffer>+
+ *
+ * -- Data in each column buffer is stored using a run-length
+ * -- encoding scheme that is intended to reduce the cost of
+ * -- repeated column field values. This mechanism is described
+ * -- in more detail in the following entries.
+ *
+ * rowgroup-key-buffer ::=
+ *   <column-buffer-length>
+ *   <column-buffer-uncompressed-length>
+ *   <column-key-buffer-length>
+ *   <column-key-buffer>
+ *
+ * -- The serialized length on disk of the corresponding column buffer.
+ *
+ * column-buffer-length ::= VInt
+ *
+ * -- The uncompressed length of the corresponding column buffer. This
+ * -- is equivalent to column-buffer-length if the RCFile is not compressed.
+ *
+ * column-buffer-uncompressed-length ::= VInt
+ *
+ * -- The length in bytes of the current column key buffer
+ *
+ * column-key-buffer-length ::= VInt
+ *
+ * -- The column-key-buffer contains a sequence of serialized VInt values
+ * -- corresponding to the byte lengths of the serialized column fields
+ * -- in the corresponding rowgroup-column-buffer. For example, consider
+ * -- an integer column that contains the consecutive values 1, 2, 3, 44.
+ * -- The RCFile format stores these values as strings in the column buffer,
+ * -- e.g. "12344". The length of each column field is recorded in
+ * -- the column-key-buffer as a sequence of VInts: 1,1,1,2. However,
+ * -- if the same length occurs repeatedly, then we replace repeated
+ * -- run lengths with the complement (i.e. negative) of the number of
+ * -- repetitions, so 1,1,1,2 becomes 1,~2,2.
+ *
+ * column-key-buffer ::= Byte[column-key-buffer-length]
+ *
+ * rowgroup-column-buffers ::= <rowgroup-value-buffer>+
+ *
+ * -- RCFile stores all column data as strings regardless of the
+ * -- underlying column type. The strings are neither length-prefixed or
+ * -- null-terminated, and decoding them into individual fields requires
+ * -- the use of the run-length information contained in the corresponding
+ * -- column-key-buffer.
+ *
+ * rowgroup-column-buffer ::= Byte[column-buffer-length]
+ *
+ * Byte ::= An eight-bit byte
+ *
+ * VInt ::= Variable length integer. The high-order bit of each byte
+ * indicates whether more bytes remain to be read. The low-order seven
+ * bits are appended as increasingly more significant bits in the
+ * resulting integer value.
+ *
+ * Int ::= A four-byte integer in big-endian format.
+ *
+ * Text ::= VInt, Chars (Length prefixed UTF-8 characters)
+ * }
+ * </pre>
+ * </p>
  */
 public class RCFile {
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java Thu May  9 04:38:38 2013
@@ -103,7 +103,7 @@ final class DynamicByteArray {
    * @param value the array to copy from
    * @param valueOffset the first location to copy from value
    * @param valueLength the number of bytes to copy from value
-   * @return
+   * @return the offset of the start of the value
    */
   public int add(byte[] value, int valueOffset, int valueLength) {
     int i = length / chunkSize;
@@ -293,4 +293,11 @@ final class DynamicByteArray {
     }
     return result;
   }
+
+  /**
+   * Get the size of the buffers.
+   */
+  public long getSizeInBytes() {
+    return initializedChunks * chunkSize;
+  }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java Thu May  9 04:38:38 2013
@@ -135,5 +135,8 @@ final class DynamicIntArray {
     return sb.toString();
   }
 
+  public int getSizeInBytes() {
+    return 4 * initializedChunks * chunkSize;
+  }
 }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java Thu May  9 04:38:38 2013
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -33,19 +35,25 @@ import java.util.Map;
  * dynamic partitions, it is easy to end up with many writers in the same task.
  * By managing the size of each allocation, we try to cut down the size of each
  * allocation and keep the task from running out of memory.
+ *
+ * This class is thread safe and uses synchronization around the shared state
+ * to prevent race conditions.
  */
 class MemoryManager {
+
+  private static final Log LOG = LogFactory.getLog(MemoryManager.class);
+
   /**
-   * How much does the pool need to change between notifications?
+   * How often should we check the memory sizes? Measured in rows added
+   * to all of the writers.
    */
-  private static final double NOTIFICATION_FACTOR = 1.1;
+  private static final int ROWS_BETWEEN_CHECKS = 5000;
   private final long totalMemoryPool;
-  private long notificationTrigger;
   private final Map<Path, WriterInfo> writerList =
       new HashMap<Path, WriterInfo>();
   private long totalAllocation = 0;
   private double currentScale = 1;
-  private double lastNotificationScale = 1;
+  private int rowsAddedSinceCheck = 0;
 
   private static class WriterInfo {
     long allocation;
@@ -57,7 +65,13 @@ class MemoryManager {
   }
 
   public interface Callback {
-    void checkMemory(double newScale) throws IOException;
+    /**
+     * The writer needs to check its memory usage
+     * @param newScale the current scale factor for memory allocations
+     * @return true if the writer was over the limit
+     * @throws IOException
+     */
+    boolean checkMemory(double newScale) throws IOException;
   }
 
   /**
@@ -70,22 +84,26 @@ class MemoryManager {
     double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal);
     totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
         getHeapMemoryUsage().getMax() * maxLoad);
-    notificationTrigger = Math.round(totalMemoryPool * NOTIFICATION_FACTOR);
   }
 
   /**
-   * Add a new writer's memory allocation to the pool
+   * Add a new writer's memory allocation to the pool. We use the path
+   * as a unique key to ensure that we don't get duplicates.
    * @param path the file that is being written
    * @param requestedAllocation the requested buffer size
    */
   synchronized void addWriter(Path path, long requestedAllocation,
                               Callback callback) throws IOException {
     WriterInfo oldVal = writerList.get(path);
+    // this should always be null, but we handle the case where the memory
+    // manager wasn't told that a writer wasn't still in use and the task
+    // starts writing to the same path.
     if (oldVal == null) {
       oldVal = new WriterInfo(requestedAllocation, callback);
       writerList.put(path, oldVal);
       totalAllocation += requestedAllocation;
     } else {
+      // handle a new writer that is writing to the same path
       totalAllocation += requestedAllocation - oldVal.allocation;
       oldVal.allocation = requestedAllocation;
       oldVal.callback = callback;
@@ -125,6 +143,31 @@ class MemoryManager {
   }
 
   /**
+   * Give the memory manager an opportunity for doing a memory check.
+   * @throws IOException
+   */
+  synchronized void addedRow() throws IOException {
+    if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
+      notifyWriters();
+    }
+  }
+
+  /**
+   * Notify all of the writers that they should check their memory usage.
+   * @throws IOException
+   */
+  private void notifyWriters() throws IOException {
+    LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
+    for(WriterInfo writer: writerList.values()) {
+      boolean flushed = writer.callback.checkMemory(currentScale);
+      if (LOG.isDebugEnabled() && flushed) {
+        LOG.debug("flushed " + writer.toString());
+      }
+    }
+    rowsAddedSinceCheck = 0;
+  }
+
+  /**
    * Update the currentScale based on the current allocation and pool size.
    * This also updates the notificationTrigger.
    * @param isAllocate is this an allocation?
@@ -135,21 +178,5 @@ class MemoryManager {
     } else {
       currentScale = (double) totalMemoryPool / totalAllocation;
     }
-    if (!isAllocate) {
-      // ensure that we notify if we drop 10% from the high water mark
-      notificationTrigger =
-          Math.min(notificationTrigger,
-              Math.round(totalMemoryPool * NOTIFICATION_FACTOR / currentScale));
-    } else {
-      // we've allocated a new writer, so check to see if we need to notify
-      if (totalAllocation > notificationTrigger) {
-        for(WriterInfo writer: writerList.values()) {
-          writer.callback.checkMemory(currentScale);
-        }
-        // set the next notification trigger
-        notificationTrigger =
-            Math.round(totalMemoryPool * NOTIFICATION_FACTOR / currentScale);
-      }
-    }
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java Thu May  9 04:38:38 2013
@@ -21,12 +21,16 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
@@ -58,8 +62,18 @@ final class OrcStruct implements Writabl
     return fields.length;
   }
 
+  /**
+   * Change the number of fields in the struct. No effect if the number of
+   * fields is the same. The old field values are copied to the new array.
+   * @param numFields the new number of fields
+   */
   public void setNumFields(int numFields) {
-    fields = new Object[numFields];
+    if (fields.length != numFields) {
+      Object[] oldFields = fields;
+      fields = new Object[numFields];
+      System.arraycopy(oldFields, 0, fields, 0,
+          Math.min(oldFields.length, numFields));
+    }
   }
 
    @Override
@@ -148,7 +162,7 @@ final class OrcStruct implements Writabl
     }
   }
 
-  static class OrcStructInspector extends StructObjectInspector {
+  static class OrcStructInspector extends SettableStructObjectInspector {
     private final List<StructField> fields;
 
     OrcStructInspector(StructTypeInfo info) {
@@ -223,9 +237,52 @@ final class OrcStruct implements Writabl
     public Category getCategory() {
       return Category.STRUCT;
     }
+
+    @Override
+    public Object create() {
+      return new OrcStruct(0);
+    }
+
+    @Override
+    public Object setStructFieldData(Object struct, StructField field,
+                                     Object fieldValue) {
+      OrcStruct orcStruct = (OrcStruct) struct;
+      int offset = ((Field) field).offset;
+      // if the offset is bigger than our current number of fields, grow it
+      if (orcStruct.getNumFields() <= offset) {
+        orcStruct.setNumFields(offset+1);
+      }
+      orcStruct.setFieldValue(offset, fieldValue);
+      return struct;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o == null || o.getClass() != getClass()) {
+        return false;
+      } else if (o == this) {
+        return true;
+      } else {
+        List<StructField> other = ((OrcStructInspector) o).fields;
+        if (other.size() != fields.size()) {
+          return false;
+        }
+        for(int i = 0; i < fields.size(); ++i) {
+          StructField left = other.get(i);
+          StructField right = fields.get(i);
+          if (!(left.getFieldName().equals(right.getFieldName()) &&
+                left.getFieldObjectInspector().equals
+                    (right.getFieldObjectInspector()))) {
+            return false;
+          }
+        }
+        return true;
+      }
+    }
   }
 
-  static class OrcMapObjectInspector implements MapObjectInspector {
+  static class OrcMapObjectInspector
+      implements MapObjectInspector, SettableMapObjectInspector {
     private final ObjectInspector key;
     private final ObjectInspector value;
 
@@ -275,9 +332,45 @@ final class OrcStruct implements Writabl
     public Category getCategory() {
       return Category.MAP;
     }
+
+    @Override
+    public Object create() {
+      return new HashMap<Object,Object>();
+    }
+
+    @Override
+    public Object put(Object map, Object key, Object value) {
+      ((Map) map).put(key, value);
+      return map;
+    }
+
+    @Override
+    public Object remove(Object map, Object key) {
+      ((Map) map).remove(key);
+      return map;
+    }
+
+    @Override
+    public Object clear(Object map) {
+      ((Map) map).clear();
+      return map;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o == null || o.getClass() != getClass()) {
+        return false;
+      } else if (o == this) {
+        return true;
+      } else {
+        OrcMapObjectInspector other = (OrcMapObjectInspector) o;
+        return other.key.equals(key) && other.value.equals(value);
+      }
+    }
   }
 
-  static class OrcListObjectInspector implements ListObjectInspector {
+  static class OrcListObjectInspector
+      implements ListObjectInspector, SettableListObjectInspector {
     private final ObjectInspector child;
 
     OrcListObjectInspector(ListTypeInfo info) {
@@ -319,6 +412,43 @@ final class OrcStruct implements Writabl
     public Category getCategory() {
       return Category.LIST;
     }
+
+    @Override
+    public Object create(int size) {
+      ArrayList<Object> result = new ArrayList<Object>(size);
+      for(int i = 0; i < size; ++i) {
+        result.add(null);
+      }
+      return result;
+    }
+
+    @Override
+    public Object set(Object list, int index, Object element) {
+      List l = (List) list;
+      for(int i=l.size(); i < index+1; ++i) {
+        l.add(null);
+      }
+      l.set(index, element);
+      return list;
+    }
+
+    @Override
+    public Object resize(Object list, int newSize) {
+      ((ArrayList) list).ensureCapacity(newSize);
+      return list;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o == null || o.getClass() != getClass()) {
+        return false;
+      } else if (o == this) {
+        return true;
+      } else {
+        ObjectInspector other = ((OrcListObjectInspector) o).child;
+        return other.equals(child);
+      }
+    }
   }
 
   static ObjectInspector createObjectInspector(TypeInfo info) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java Thu May  9 04:38:38 2013
@@ -134,5 +134,25 @@ final class OrcUnion implements UnionObj
     public Category getCategory() {
       return Category.UNION;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o == null || o.getClass() != getClass()) {
+        return false;
+      } else if (o == this) {
+        return true;
+      } else {
+        List<ObjectInspector> other = ((OrcUnionObjectInspector) o).children;
+        if (other.size() != children.size()) {
+          return false;
+        }
+        for(int i = 0; i < children.size(); ++i) {
+          if (!other.get(i).equals(children.get(i))) {
+            return false;
+          }
+        }
+        return true;
+      }
+    }
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java Thu May  9 04:38:38 2013
@@ -23,15 +23,40 @@ import java.nio.ByteBuffer;
 class OutStream extends PositionedOutputStream {
 
   interface OutputReceiver {
+    /**
+     * Output the given buffer to the final destination
+     * @param buffer the buffer to output
+     * @throws IOException
+     */
     void output(ByteBuffer buffer) throws IOException;
   }
 
   static final int HEADER_SIZE = 3;
   private final String name;
   private final OutputReceiver receiver;
+
+  /**
+   * Stores the uncompressed bytes that have been serialized, but not
+   * compressed yet. When this fills, we compress the entire buffer.
+   */
+  private ByteBuffer current = null;
+
+  /**
+   * Stores the compressed bytes until we have a full buffer and then outputs
+   * them to the receiver. If no compression is being done, this (and overflow)
+   * will always be null and the current buffer will be sent directly to the
+   * receiver.
+   */
   private ByteBuffer compressed = null;
+
+  /**
+   * Since the compressed buffer may start with contents from previous
+   * compression blocks, we allocate an overflow buffer so that the
+   * output of the codec can be split between the two buffers. After the
+   * compressed buffer is sent to the receiver, the overflow buffer becomes
+   * the new compressed buffer.
+   */
   private ByteBuffer overflow = null;
-  private ByteBuffer current;
   private final int bufferSize;
   private final CompressionCodec codec;
   private long compressedBytes = 0;
@@ -85,9 +110,11 @@ class OutStream extends PositionedOutput
     }
   }
 
+  /**
+   * Allocate a new output buffer if we are compressing.
+   */
   private ByteBuffer getNewOutputBuffer() throws IOException {
-    return ByteBuffer.allocate(bufferSize +
-      (codec == null ? 0 : HEADER_SIZE));
+    return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
   }
 
   private void flip() throws IOException {
@@ -128,7 +155,8 @@ class OutStream extends PositionedOutput
 
   private void spill() throws java.io.IOException {
     // if there isn't anything in the current buffer, don't spill
-    if (current == null || current.position() == (codec == null ? 0 : HEADER_SIZE)) {
+    if (current == null ||
+        current.position() == (codec == null ? 0 : HEADER_SIZE)) {
       return;
     }
     flip();
@@ -223,8 +251,18 @@ class OutStream extends PositionedOutput
   }
 
   @Override
-  public long getSize() {
-    return uncompressedBytes + compressedBytes;
+  public long getBufferSize() {
+    long result = 0;
+    if (current != null) {
+      result += current.capacity();
+    }
+    if (compressed != null) {
+      result += compressed.capacity();
+    }
+    if (overflow != null) {
+      result += overflow.capacity();
+    }
+    return result;
   }
 }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java Thu May  9 04:38:38 2013
@@ -21,6 +21,18 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 abstract class PositionedOutputStream extends OutputStream {
+
+  /**
+   * Record the current position to the recorder.
+   * @param recorder the object that receives the position
+   * @throws IOException
+   */
   abstract void getPosition(PositionRecorder recorder) throws IOException;
-  abstract long getSize();
+
+  /**
+   * Get the memory size currently allocated as buffer associated with this
+   * stream.
+   * @return the number of bytes used by buffers.
+   */
+  abstract long getBufferSize();
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java Thu May  9 04:38:38 2013
@@ -25,13 +25,11 @@ package org.apache.hadoop.hive.ql.io.orc
  */
 abstract class RedBlackTree {
   public static final int NULL = -1;
-  private static final int DEFAULT_INITIAL_CAPACITY = 16 * 1024;
 
   // Various values controlling the offset of the data within the array.
   private static final int LEFT_OFFSET = 0;
   private static final int RIGHT_OFFSET = 1;
-  private static final int COUNT_OFFSET = 2;
-  private static final int ELEMENT_SIZE = 3;
+  private static final int ELEMENT_SIZE = 2;
 
   protected int size = 0;
   private final DynamicIntArray data;
@@ -40,13 +38,6 @@ abstract class RedBlackTree {
   private boolean wasAdd = false;
 
   /**
-   * Create a set with a default initial capacity.
-   */
-  public RedBlackTree() {
-    data = new DynamicIntArray(DEFAULT_INITIAL_CAPACITY * ELEMENT_SIZE);
-  }
-
-  /**
    * Create a set with the given initial capacity.
    */
   public RedBlackTree(int initialCapacity) {
@@ -63,7 +54,6 @@ abstract class RedBlackTree {
     size += 1;
     setLeft(position, left, isRed);
     setRight(position, right);
-    setCount(position, 1);
     return position;
   }
 
@@ -109,18 +99,6 @@ abstract class RedBlackTree {
     return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET);
   }
 
-  protected int getCount(int position) {
-    return data.get(position * ELEMENT_SIZE + COUNT_OFFSET);
-  }
-
-  private void setCount(int position, int value) {
-    data.set(position * ELEMENT_SIZE + COUNT_OFFSET, value);
-  }
-
-  private void incrementCount(int position, int value) {
-    data.increment(position * ELEMENT_SIZE + COUNT_OFFSET, value);
-  }
-
   /**
    * Set the left field of the given position.
    * Note that we are storing the node color in the low bit of the left pointer.
@@ -200,7 +178,6 @@ abstract class RedBlackTree {
       } else {
         lastAdd = node;
         wasAdd = false;
-        incrementCount(node, 1);
         return false;
       }
 
@@ -322,5 +299,11 @@ abstract class RedBlackTree {
     data.clear();
   }
 
+  /**
+   * Get the buffer size in bytes.
+   */
+  public long getSizeInBytes() {
+    return data.getSizeInBytes();
+  }
 }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java Thu May  9 04:38:38 2013
@@ -24,19 +24,16 @@ import java.io.OutputStream;
 
 /**
  * A red-black tree that stores strings. The strings are stored as UTF-8 bytes
- * and an offset/length for each entry.
+ * and an offset for each entry.
  */
 class StringRedBlackTree extends RedBlackTree {
   private final DynamicByteArray byteArray = new DynamicByteArray();
-  private final DynamicIntArray keySizes = new DynamicIntArray();
+  private final DynamicIntArray keyOffsets;
   private final Text newKey = new Text();
 
-  public StringRedBlackTree() {
-    // PASS
-  }
-
   public StringRedBlackTree(int initialCapacity) {
     super(initialCapacity);
+    keyOffsets = new DynamicIntArray(initialCapacity);
   }
 
   public int add(String value) {
@@ -44,16 +41,22 @@ class StringRedBlackTree extends RedBlac
     // if the key is new, add it to our byteArray and store the offset & length
     if (add()) {
       int len = newKey.getLength();
-      keySizes.add(byteArray.add(newKey.getBytes(), 0, len));
-      keySizes.add(len);
+      keyOffsets.add(byteArray.add(newKey.getBytes(), 0, len));
     }
     return lastAdd;
   }
 
   @Override
   protected int compareValue(int position) {
+    int start = keyOffsets.get(position);
+    int end;
+    if (position + 1 == keyOffsets.size()) {
+      end = byteArray.size();
+    } else {
+      end = keyOffsets.get(position+1);
+    }
     return byteArray.compare(newKey.getBytes(), 0, newKey.getLength(),
-      keySizes.get(2 * position), keySizes.get(2 * position + 1));
+                             start, end - start);
   }
 
   /**
@@ -84,12 +87,6 @@ class StringRedBlackTree extends RedBlac
      * @return the string's length in bytes
      */
     int getLength();
-
-    /**
-     * Get the count for this key.
-     * @return the number of times this key was added
-     */
-    int getCount();
   }
 
   /**
@@ -106,6 +103,8 @@ class StringRedBlackTree extends RedBlac
 
   private class VisitorContextImpl implements VisitorContext {
     private int originalPosition;
+    private int start;
+    private int end;
     private final Text text = new Text();
 
     public int getOriginalPosition() {
@@ -113,20 +112,26 @@ class StringRedBlackTree extends RedBlac
     }
 
     public Text getText() {
-      byteArray.setText(text, keySizes.get(originalPosition * 2), getLength());
+      byteArray.setText(text, start, end - start);
       return text;
     }
 
     public void writeBytes(OutputStream out) throws IOException {
-      byteArray.write(out, keySizes.get(originalPosition * 2), getLength());
+      byteArray.write(out, start, end - start);
     }
 
     public int getLength() {
-      return keySizes.get(originalPosition * 2 + 1);
+      return end - start;
     }
 
-    public int getCount() {
-      return StringRedBlackTree.this.getCount(originalPosition);
+    void setPosition(int position) {
+      originalPosition = position;
+      start = keyOffsets.get(originalPosition);
+      if (position + 1 == keyOffsets.size()) {
+        end = byteArray.size();
+      } else {
+        end = keyOffsets.get(originalPosition + 1);
+      }
     }
   }
 
@@ -134,7 +139,7 @@ class StringRedBlackTree extends RedBlac
                       ) throws IOException {
     if (node != NULL) {
       recurse(getLeft(node), visitor, context);
-      context.originalPosition = node;
+      context.setPosition(node);
       visitor.visit(context);
       recurse(getRight(node), visitor, context);
     }
@@ -155,7 +160,7 @@ class StringRedBlackTree extends RedBlac
   public void clear() {
     super.clear();
     byteArray.clear();
-    keySizes.clear();
+    keyOffsets.clear();
   }
 
   /**
@@ -170,7 +175,8 @@ class StringRedBlackTree extends RedBlac
    * Calculate the approximate size in memory.
    * @return the number of bytes used in storing the tree.
    */
-  public long getByteSize() {
-    return byteArray.size() + 5 * 4 * size();
+  public long getSizeInBytes() {
+    return byteArray.getSizeInBytes() + keyOffsets.getSizeInBytes() +
+      super.getSizeInBytes();
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Thu May  9 04:38:38 2013
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -62,9 +64,15 @@ import com.google.protobuf.CodedOutputSt
  * type of column. TreeWriters may have children TreeWriters that handle the
  * sub-types. Each of the TreeWriters writes the column's data as a set of
  * streams.
+ *
+ * This class is synchronized so that multi-threaded access is ok. In
+ * particular, because the MemoryManager is shared between writers, this class
+ * assumes that checkMemory may be called from a separate thread.
  */
 class WriterImpl implements Writer, MemoryManager.Callback {
 
+  private static final Log LOG = LogFactory.getLog(WriterImpl.class);
+
   private static final int HDFS_BUFFER_SIZE = 256 * 1024;
   private static final int MIN_ROW_INDEX_STRIDE = 1000;
 
@@ -154,10 +162,18 @@ class WriterImpl implements Writer, Memo
   }
 
   @Override
-  public void checkMemory(double newScale) throws IOException {
-    if (estimateStripeSize() > Math.round(stripeSize * newScale)) {
-     flushStripe();
+  public synchronized boolean checkMemory(double newScale) throws IOException {
+    long limit = (long) Math.round(stripeSize * newScale);
+    long size = estimateStripeSize();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
+                limit);
+    }
+    if (size > limit) {
+      flushStripe();
+      return true;
     }
+    return false;
   }
 
   /**
@@ -186,6 +202,18 @@ class WriterImpl implements Writer, Memo
     }
 
     /**
+     * Get the number of bytes in buffers that are allocated to this stream.
+     * @return number of bytes in buffers
+     */
+    public long getBufferSize() {
+      long result = 0;
+      for(ByteBuffer buf: output) {
+        result += buf.capacity();
+      }
+      return outStream.getBufferSize() + result;
+    }
+
+    /**
      * Flush the stream to the codec.
      * @throws IOException
      */
@@ -214,12 +242,9 @@ class WriterImpl implements Writer, Memo
       }
     }
 
-    /**
-     * Get the size of compressed and uncompressed data in the stream's buffers.
-     * @return the number of bytes in the buffers.
-     */
-    long getSize() {
-      return outStream.getSize();
+    @Override
+    public String toString() {
+      return outStream.toString();
     }
   }
 
@@ -681,11 +706,12 @@ class WriterImpl implements Writer, Memo
   }
 
   private static class StringTreeWriter extends TreeWriter {
+    private static final int INITIAL_DICTIONARY_SIZE = 4096;
     private final PositionedOutputStream stringOutput;
     private final RunLengthIntegerWriter lengthOutput;
     private final RunLengthIntegerWriter rowOutput;
-    private final RunLengthIntegerWriter countOutput;
-    private final StringRedBlackTree dictionary = new StringRedBlackTree();
+    private final StringRedBlackTree dictionary =
+        new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
     private final DynamicIntArray rows = new DynamicIntArray();
     private final List<OrcProto.RowIndexEntry> savedRowIndex =
         new ArrayList<OrcProto.RowIndexEntry>();
@@ -703,12 +729,6 @@ class WriterImpl implements Writer, Memo
           OrcProto.Stream.Kind.LENGTH), false);
       rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
           OrcProto.Stream.Kind.DATA), false);
-      if (writer.buildIndex()) {
-        countOutput = new RunLengthIntegerWriter(writer.createStream(id,
-            OrcProto.Stream.Kind.DICTIONARY_COUNT), false);
-      } else {
-        countOutput = null;
-      }
       recordPosition(rowIndexPosition);
       rowIndexValueCount.add(0L);
       buildIndex = writer.buildIndex();
@@ -739,9 +759,6 @@ class WriterImpl implements Writer, Memo
           context.writeBytes(stringOutput);
           lengthOutput.write(context.getLength());
           dumpOrder[context.getOriginalPosition()] = currentId++;
-          if (countOutput != null) {
-            countOutput.write(context.getCount());
-          }
         }
       });
       int length = rows.size();
@@ -770,9 +787,6 @@ class WriterImpl implements Writer, Memo
       stringOutput.flush();
       lengthOutput.flush();
       rowOutput.flush();
-      if (countOutput != null) {
-        countOutput.flush();
-      }
       // reset all of the fields to be ready for the next stripe.
       dictionary.clear();
       rows.clear();
@@ -809,7 +823,7 @@ class WriterImpl implements Writer, Memo
 
     @Override
     long estimateMemory() {
-      return rows.size() * 4 + dictionary.getByteSize();
+      return rows.getSizeInBytes() + dictionary.getSizeInBytes();
     }
   }
 
@@ -1434,40 +1448,43 @@ class WriterImpl implements Writer, Memo
   private long estimateStripeSize() {
     long result = 0;
     for(BufferedStream stream: streams.values()) {
-      result += stream.getSize();
+      result += stream.getBufferSize();
     }
     result += treeWriter.estimateMemory();
     return result;
   }
 
   @Override
-  public void addUserMetadata(String name, ByteBuffer value) {
+  public synchronized void addUserMetadata(String name, ByteBuffer value) {
     userMetadata.put(name, ByteString.copyFrom(value));
   }
 
   @Override
   public void addRow(Object row) throws IOException {
-    treeWriter.write(row);
-    rowsInStripe += 1;
-    if (buildIndex) {
-      rowsInIndex += 1;
-
-      if (rowsInIndex >= rowIndexStride) {
-        createRowIndexEntry();
+    synchronized (this) {
+      treeWriter.write(row);
+      rowsInStripe += 1;
+      if (buildIndex) {
+        rowsInIndex += 1;
+
+        if (rowsInIndex >= rowIndexStride) {
+          createRowIndexEntry();
+        }
       }
     }
-    // once every 1000 rows, check the size to see if we should spill
-    if (rowsInStripe % 1000 == 0) {
-      checkMemory(memoryManager.getAllocationScale());
-    }
+    memoryManager.addedRow();
   }
 
   @Override
   public void close() throws IOException {
-    flushStripe();
-    int footerLength = writeFooter(rawWriter.getPos());
-    rawWriter.writeByte(writePostScript(footerLength));
-    rawWriter.close();
+    // remove us from the memory manager so that we don't get any callbacks
     memoryManager.removeWriter(path);
+    // actually close the file
+    synchronized (this) {
+      flushStripe();
+      int footerLength = writeFooter(rawWriter.getPos());
+      rawWriter.writeByte(writePostScript(footerLength));
+      rawWriter.close();
+    }
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java Thu May  9 04:38:38 2013
@@ -44,7 +44,7 @@ public class ColumnAccessAnalyzer {
     for (TableScanOperator op : topOps.keySet()) {
       Table table = topOps.get(op);
       String tableName = table.getCompleteName();
-      List<FieldSchema> tableCols = table.getAllCols();
+      List<FieldSchema> tableCols = table.getCols();
       for (int i : op.getNeededColumnIDs()) {
         columnAccessInfo.add(tableName, tableCols.get(i).getName());
       }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java Thu May  9 04:38:38 2013
@@ -1050,8 +1050,7 @@ public class PTFTranslator {
       }
       ColumnInfo cInfo = new ColumnInfo(wFnDef.getAlias(),
           TypeInfoUtils.getTypeInfoFromObjectInspector(wFnOI),
-          null,
-          false);
+          null, true, true);
       rr.putExpression(ast, cInfo);
     }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu May  9 04:38:38 2013
@@ -5167,10 +5167,11 @@ public class SemanticAnalyzer extends Ba
 
         if (field_schemas != null) {
           FieldSchema col = new FieldSchema();
-          if (nm[1] != null) {
-            col.setName(unescapeIdentifier(colInfo.getAlias()).toLowerCase()); // remove ``
-          } else {
+          if ("".equals(nm[0]) || nm[1] == null) {
+            // ast expression is not a valid column name for table
             col.setName(colInfo.getInternalName());
+          } else {
+            col.setName(unescapeIdentifier(colInfo.getAlias()).toLowerCase()); // remove ``
           }
           col.setType(colInfo.getType().getTypeName());
           field_schemas.add(col);
@@ -7385,6 +7386,10 @@ public class SemanticAnalyzer extends Ba
       for (ColumnInfo colInfo : rr.getColumnInfos()) {
         String name = colInfo.getInternalName();
         String[] tmp = rr.reverseLookup(name);
+        if ("".equals(tmp[0]) || tmp[1] == null) {
+          // ast expression is not a valid column name for table
+          tmp[1] = colInfo.getInternalName();
+        }
         newRR.put(alias, tmp[1], colInfo);
       }
       opParseCtx.get(curr).setRowResolver(newRR);

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Thu May  9 04:38:38 2013
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.net.URI;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.GregorianCalendar;
@@ -41,7 +40,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.MapRedStats;
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.history.HiveHistory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;

Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java Thu May  9 04:38:38 2013
@@ -39,8 +39,8 @@ public class TestMemoryManager {
   private static final double ERROR = 0.000001;
 
   private static class NullCallback implements MemoryManager.Callback {
-    public void checkMemory(double newScale) {
-      // PASS
+    public boolean checkMemory(double newScale) {
+      return false;
     }
   }
 
@@ -120,17 +120,13 @@ public class TestMemoryManager {
       calls[i] = mock(MemoryManager.Callback.class);
       mgr.addWriter(new Path(Integer.toString(i)), pool/4, calls[i]);
     }
-    double[] spills = new double[]{0, 0, 0, 0, 0.8, 0.666666666667,
-                                   0.571428571429, 0.5, 0.444444444444,
-                                   0.4, 0, 0.333333333333, 0, 0.285714285714,
-                                   0, 0.25, 0, 0.222222222222, 0, 0.2};
-    for(int spill=0; spill < spills.length; ++spill) {
-      if (spills[spill] != 0) {
-        for(int call=0; call < spill + 1; ++call) {
-          verify(calls[call], times(1))
-            .checkMemory(doubleThat(closeTo(spills[spill], ERROR)));
-        }
-      }
+    // add enough rows to get the memory manager to check the limits
+    for(int i=0; i < 10000; ++i) {
+      mgr.addedRow();
+    }
+    for(int call=0; call < calls.length; ++call) {
+      verify(calls[call], times(2))
+          .checkMemory(doubleThat(closeTo(0.2, ERROR)));
     }
   }
 }