You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/05/09 06:38:39 UTC
svn commit: r1480526 [1/2] - in /hive/branches/vectorization: ./ data/files/
hcatalog/build-support/ant/
hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/
hcatalog/src/test/ hcatalog/src/test/e2e/templeton/drivers/
hcatalog/src/tes...
Author: hashutosh
Date: Thu May 9 04:38:38 2013
New Revision: 1480526
URL: http://svn.apache.org/r1480526
Log:
Merged in with trunk. Resolved conflicts in DynamicByteArray.java
Added:
hive/branches/vectorization/data/files/orc_create.txt
- copied unchanged from r1480524, hive/trunk/data/files/orc_create.txt
hive/branches/vectorization/ql/src/test/queries/clientpositive/ctas_colname.q
- copied unchanged from r1480524, hive/trunk/ql/src/test/queries/clientpositive/ctas_colname.q
hive/branches/vectorization/ql/src/test/results/clientpositive/ctas_colname.q.out
- copied unchanged from r1480524, hive/trunk/ql/src/test/results/clientpositive/ctas_colname.q.out
Removed:
hive/branches/vectorization/hcatalog/src/test/.gitignore
Modified:
hive/branches/vectorization/ (props changed)
hive/branches/vectorization/build-common.xml
hive/branches/vectorization/build.properties
hive/branches/vectorization/build.xml
hive/branches/vectorization/hcatalog/build-support/ant/checkstyle.xml
hive/branches/vectorization/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
hive/branches/vectorization/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
hive/branches/vectorization/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringRedBlackTree.java
hive/branches/vectorization/ql/src/test/queries/clientpositive/column_access_stats.q
hive/branches/vectorization/ql/src/test/queries/clientpositive/orc_create.q
hive/branches/vectorization/ql/src/test/resources/orc-file-dump.out
hive/branches/vectorization/ql/src/test/results/clientpositive/column_access_stats.q.out
hive/branches/vectorization/ql/src/test/results/clientpositive/orc_create.q.out
hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/operation/Operation.java
hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
Propchange: hive/branches/vectorization/
------------------------------------------------------------------------------
Merged /hive/branches/branch-0.11:r1480385,1480458
Merged /hive/trunk:r1477794-1480524
Modified: hive/branches/vectorization/build-common.xml
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/build-common.xml?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/build-common.xml (original)
+++ hive/branches/vectorization/build-common.xml Thu May 9 04:38:38 2013
@@ -70,6 +70,10 @@
<property name="hadoop.opts.23" value="-D mapreduce.framework.name=local" />
<property name="hadoop.opts.20" value="" />
+ <condition property="test.halt.on.failure" value="no" else="yes">
+ <equals arg1="${test.continue.on.failure}" arg2="true"/>
+ </condition>
+
<target name="set-test-classpath">
<typedef name="distinctelementsclasspath" classname="org.apache.hadoop.hive.ant.DistinctElementsClassPath"
classpath="${build.dir.hive}/anttasks/hive-anttasks-${version}.jar:${build.ivy.lib.dir}/default/commons-collections-${commons-collections.version}.jar:${build.ivy.lib.dir}/default/commons-lang-${commons-lang.version}.jar"/>
@@ -429,7 +433,8 @@
<echo message="Test Classpath: ${hadoop.testcp}"/>
</then>
</if>
- <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
+
+ <junit showoutput="${test.output}" printsummary="yes" haltonfailure="${test.halt.on.failure}"
fork="yes" maxmemory="${test.junit.maxmemory}" dir="${basedir}" timeout="${test.junit.timeout}"
errorProperty="tests.failed" failureProperty="tests.failed" filtertrace="off">
<jvmarg value="-XX:+HeapDumpOnOutOfMemoryError"/>
Modified: hive/branches/vectorization/build.properties
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/build.properties?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/build.properties (original)
+++ hive/branches/vectorization/build.properties Thu May 9 04:38:38 2013
@@ -75,7 +75,7 @@ common.jar=${hadoop.root}/lib/commons-ht
# module names needed for build process
iterate.hive.all=ant,shims,common,serde,metastore,ql,contrib,service,cli,jdbc,beeline,hwi,hbase-handler,testutils,hcatalog
iterate.hive.modules=shims,common,serde,metastore,ql,contrib,service,cli,jdbc,beeline,hwi,hbase-handler,testutils,hcatalog
-iterate.hive.tests=ql,contrib,hbase-handler,hwi,jdbc,metastore,odbc,serde,service,hcatalog
+iterate.hive.tests=ql,contrib,hbase-handler,hwi,jdbc,beeline,metastore,odbc,serde,service,hcatalog
iterate.hive.thrift=ql,service,metastore,serde
iterate.hive.protobuf=ql
iterate.hive.cpp=odbc
@@ -95,7 +95,7 @@ test.junit.timeout=43200000
# ant test -Dtest.junit.exclude="**/Test*CliDriver.class,**/TestPartitions.class"
test.junit.exclude=
-test.continue.on.failure=false
+test.continue.on.failure=true
test.submodule.exclude=
test.junit.maxmemory=512m
@@ -129,10 +129,6 @@ mvn.pom.dir=${build.dir.hive}/maven/poms
mvn.license.dir=${build.dir.hive}/maven/licenses
mvn.deploy.id=apache.snapshots.https
mvn.deploy.url=https://repository.apache.org/content/repositories/snapshots
-#
-# unit test Properties
-#
-failonerror=false
#
# Data nucleus repository - needed for jdo2-api-2.3-ec.jar download
Modified: hive/branches/vectorization/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/build.xml?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/build.xml (original)
+++ hive/branches/vectorization/build.xml Thu May 9 04:38:38 2013
@@ -70,6 +70,10 @@
<equals arg1="${mvn.publish.repo}" arg2="staging"/>
</condition>
+ <condition property="test.halt.on.failure" value="no" else="yes">
+ <equals arg1="${test.continue.on.failure}" arg2="true"/>
+ </condition>
+
<taskdef resource="net/sf/antcontrib/antcontrib.properties">
<classpath>
<pathelement location="${hive.root}/testlibs/ant-contrib-1.0b3.jar"/>
@@ -293,7 +297,7 @@
<condition property="target.module" value="${module}" else="${iterate.hive.test}">
<isset property="module"/>
</condition>
- <for list="${target.module}" param="module">
+ <for keepgoing="${test.continue.on.failure}" list="${target.module}" param="module">
<sequential>
<ant antfile="@{module}/build.xml" target="test" inheritAll="false" inheritRefs="true">
<property name="build.dir.hive" location="${build.dir.hive}"/>
@@ -305,7 +309,7 @@
<target name="test-shims">
<echo message="Project: ${ant.project.name}"/>
- <subant target="test" failonerror="${failonerror}">
+ <subant target="test" failonerror="${test.halt.on.failure}">
<property name="hadoop.version" value="${hadoop.security.version}"/>
<property name="hadoop.security.version" value="${hadoop.security.version}"/>
<fileset dir="${hive.root}/shims" includes="build.xml"/>
@@ -506,8 +510,8 @@
<!-- Package the hcat stuff and pull it up into Hive's build dir -->
<ant antfile="${hive.root}/hcatalog/build.xml" target="package"
inheritAll="false"/>
- <mkdir dir="${build.dir.hive}/hcatalog"/>
- <copy todir="${build.dir.hive}/hcatalog">
+ <mkdir dir="${target.dir}/hcatalog"/>
+ <copy todir="${target.dir}/hcatalog">
<fileset dir="${hive.root}/hcatalog/build/hcatalog-${hcatalog.version}"/>
</copy>
</target>
@@ -711,6 +715,7 @@
<tarfileset dir="${hive.root}" mode="664" prefix="${tar.final.name}/src"
excludes="${vcs.excludes}">
<exclude name="build/**" />
+ <exclude name="hcatalog/**/build/**" />
<exclude name="bin/**" />
<exclude name="**/py/**/*-remote" />
<exclude name="data/scripts/**" />
@@ -720,6 +725,7 @@
<tarfileset dir="${hive.root}" mode="755" prefix="${tar.final.name}/src"
excludes="${vcs.excludes}">
<exclude name="build/**" />
+ <exclude name="hcatalog/**/build/**" />
<include name="bin/**" />
<include name="**/py/**/*-remote" />
<include name="data/scripts/**" />
@@ -750,9 +756,6 @@
<exclude name="docs/**"/>
<exclude name="lib/py/**/*-remote"/>
</tarfileset>
- <tarfileset dir="${build.dir.hive}/hcatalog" mode="755" prefix="${bin.final.name}"
- excludes="${vcs.excludes}">
- </tarfileset>
</param.listofitems>
</macro_tar>
</target>
Modified: hive/branches/vectorization/hcatalog/build-support/ant/checkstyle.xml
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/build-support/ant/checkstyle.xml?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/build-support/ant/checkstyle.xml (original)
+++ hive/branches/vectorization/hcatalog/build-support/ant/checkstyle.xml Thu May 9 04:38:38 2013
@@ -31,6 +31,7 @@
<checkstyle classpathref="checkstyle.class.path"
config="${path.to.basedir}/build-support/checkstyle/coding_style.xml">
<fileset dir="${basedir}">
+ <exclude name="**/.*"/>
<exclude name="**/build/**"/>
<exclude name=".idea/**"/>
<exclude name="historical/**"/>
Modified: hive/branches/vectorization/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ hive/branches/vectorization/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java Thu May 9 04:38:38 2013
@@ -24,9 +24,9 @@ import java.util.HashMap;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
@@ -76,17 +76,42 @@ import org.slf4j.LoggerFactory;
public class NotificationListener extends MetaStoreEventListener {
private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
- protected Session session;
protected Connection conn;
private static MessageFactory messageFactory = MessageFactory.getInstance();
+ public static final int NUM_RETRIES = 1;
+ private static final String HEALTH_CHECK_TOPIC_SUFFIX = "jms_health_check";
+ private static final String HEALTH_CHECK_MSG = "HCAT_JMS_HEALTH_CHECK_MESSAGE";
+
+ protected final ThreadLocal<Session> session = new ThreadLocal<Session>() {
+ @Override
+ protected Session initialValue() {
+ try {
+ return createSession();
+ } catch (Exception e) {
+ LOG.error("Couldn't create JMS Session", e);
+ return null;
+ }
+ }
+
+ @Override
+ public void remove() {
+ if (get() != null) {
+ try {
+ get().close();
+ } catch (Exception e) {
+ LOG.error("Unable to close bad JMS session, ignored error", e);
+ }
+ }
+ super.remove();
+ }
+ };
/**
* Create message bus connection and session in constructor.
*/
public NotificationListener(final Configuration conf) {
-
super(conf);
- createConnection();
+ testAndCreateConnection();
}
private static String getTopicName(Partition partition,
@@ -178,7 +203,7 @@ public class NotificationListener extend
// Subscriber can get notification about drop of a database in HCAT
// by listening on a topic named "HCAT" and message selector string
// as "HCAT_EVENT = HCAT_DROP_DATABASE"
- if (dbEvent.getStatus()) {
+ if (dbEvent.getStatus()) {
String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName);
}
@@ -216,7 +241,7 @@ public class NotificationListener extend
}
}
- private String getTopicPrefix(HiveConf conf) {
+ private String getTopicPrefix(Configuration conf) {
return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
}
@@ -253,86 +278,102 @@ public class NotificationListener extend
* @param topicName is the name on message broker on which message is sent.
*/
protected void send(HCatEventMessage hCatEventMessage, String topicName) {
- try {
- if(null == session){
- // this will happen, if we never able to establish a connection.
- createConnection();
- if (null == session){
- // Still not successful, return from here.
- LOG.error("Invalid session. Failed to send message on topic: " +
- topicName + " event: " + hCatEventMessage.getEventType());
- return;
- }
- }
-
- Destination topic = getTopic(topicName);
+ send(hCatEventMessage, topicName, NUM_RETRIES);
+ }
- if (null == topic){
- // Still not successful, return from here.
- LOG.error("Invalid session. Failed to send message on topic: " +
- topicName + " event: " + hCatEventMessage.getEventType());
- return;
+ /**
+ * @param hCatEventMessage The HCatEventMessage being sent over JMS, this method is threadsafe
+ * @param topicName is the name on message broker on which message is sent.
+ * @param retries the number of retry attempts
+ */
+ protected void send(HCatEventMessage hCatEventMessage, String topicName, int retries) {
+ try {
+ if (session.get() == null) {
+ // Need to reconnect
+ throw new JMSException("Invalid JMS session");
}
-
- MessageProducer producer = session.createProducer(topic);
- Message msg = session.createTextMessage(hCatEventMessage.toString());
+ Destination topic = createTopic(topicName);
+ Message msg = session.get().createTextMessage(hCatEventMessage.toString());
msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString());
msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion());
msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat());
+ MessageProducer producer = createProducer(topic);
producer.send(msg);
// Message must be transacted before we return.
- session.commit();
- }
- catch(Exception e){
- // Gobble up the exception. Message delivery is best effort.
- LOG.error("Failed to send message on topic: " + topicName +
- " event: " + hCatEventMessage.getEventType(), e);
+ session.get().commit();
+ } catch (Exception e) {
+ if (retries >= 0) {
+ // this may happen if we were able to establish connection once, but its no longer valid
+ LOG.error("Seems like connection is lost. Will retry. Retries left : " + retries + ". error was:", e);
+ testAndCreateConnection();
+ send(hCatEventMessage, topicName, retries - 1);
+ } else {
+ // Gobble up the exception. Message delivery is best effort.
+ LOG.error("Failed to send message on topic: " + topicName +
+ " event: " + hCatEventMessage.getEventType() + " after retries: " + NUM_RETRIES, e);
+ }
}
}
/**
- * Get the topic object for the topicName, it also tries to reconnect
- * if the connection appears to be broken.
+ * Get the topic object for the topicName
*
* @param topicName The String identifying the message-topic.
* @return A {@link Topic} object corresponding to the specified topicName.
* @throws JMSException
*/
- protected Topic getTopic(final String topicName) throws JMSException {
- Topic topic;
+ protected Topic createTopic(final String topicName) throws JMSException {
+ return session.get().createTopic(topicName);
+ }
+
+ /**
+ * Does a health check on the connection by sending a dummy message.
+ * Create the connection if the connection is found to be bad
+ * Also recreates the session
+ */
+ protected synchronized void testAndCreateConnection() {
+ if (conn != null) {
+ // This method is reached when error occurs while sending msg, so the session must be bad
+ session.remove();
+ if (!isConnectionHealthy()) {
+ // I am the first thread to detect the error, cleanup old connection & reconnect
+ try {
+ conn.close();
+ } catch (Exception e) {
+ LOG.error("Unable to close bad JMS connection, ignored error", e);
+ }
+ conn = createConnection();
+ }
+ } else {
+ conn = createConnection();
+ }
try {
- // Topics are created on demand. If it doesn't exist on broker it will
- // be created when broker receives this message.
- topic = session.createTopic(topicName);
- } catch (IllegalStateException ise) {
- // this will happen if we were able to establish connection once, but its no longer valid,
- // ise is thrown, catch it and retry.
- LOG.error("Seems like connection is lost. Retrying", ise);
- createConnection();
- topic = session.createTopic(topicName);
+ session.set(createSession());
+ } catch (JMSException e) {
+ LOG.error("Couldn't create JMS session, ignored the error", e);
}
- return topic;
}
- protected void createConnection() {
-
+ /**
+ * Create the JMS connection
+ * @return newly created JMS connection
+ */
+ protected Connection createConnection() {
+ LOG.info("Will create new JMS connection");
Context jndiCntxt;
+ Connection jmsConnection = null;
try {
jndiCntxt = new InitialContext();
- ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
- .lookup("ConnectionFactory");
- Connection conn = connFac.createConnection();
- conn.start();
- conn.setExceptionListener(new ExceptionListener() {
+ ConnectionFactory connFac = (ConnectionFactory) jndiCntxt.lookup("ConnectionFactory");
+ jmsConnection = connFac.createConnection();
+ jmsConnection.start();
+ jmsConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException jmse) {
- LOG.error(jmse.toString());
+ LOG.error("JMS Exception listener received exception. Ignored the error", jmse);
}
});
- // We want message to be sent when session commits, thus we run in
- // transacted mode.
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
} catch (NamingException e) {
LOG.error("JNDI error while setting up Message Bus connection. "
+ "Please make sure file named 'jndi.properties' is in "
@@ -342,20 +383,54 @@ public class NotificationListener extend
} catch (Throwable t) {
LOG.error("Unable to connect to JMS provider", t);
}
+ return jmsConnection;
+ }
+
+ /**
+ * Send a dummy message to probe if the JMS connection is healthy
+ * @return true if connection is healthy, false otherwise
+ */
+ protected boolean isConnectionHealthy() {
+ try {
+ Topic topic = createTopic(getTopicPrefix(getConf()) + "." + HEALTH_CHECK_TOPIC_SUFFIX);
+ MessageProducer producer = createProducer(topic);
+ Message msg = session.get().createTextMessage(HEALTH_CHECK_MSG);
+ producer.send(msg, DeliveryMode.NON_PERSISTENT, 4, 0);
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Creates a JMS session
+ * @return newly create JMS session
+ * @throws JMSException
+ */
+ protected Session createSession() throws JMSException {
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ return conn.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ /**
+ * Create a JMS producer
+ * @param topic
+ * @return newly created message producer
+ * @throws JMSException
+ */
+ protected MessageProducer createProducer(Destination topic) throws JMSException {
+ return session.get().createProducer(topic);
}
@Override
protected void finalize() throws Throwable {
- // Close the connection before dying.
- try {
- if (null != session)
- session.close();
- if (conn != null) {
+ if (conn != null) {
+ try {
conn.close();
+ } catch (Exception e) {
+ LOG.error("Couldn't close jms connection, ignored the error", e);
}
-
- } catch (Exception ignore) {
- LOG.info("Failed to close message bus connection.", ignore);
}
}
Modified: hive/branches/vectorization/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm (original)
+++ hive/branches/vectorization/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm Thu May 9 04:38:38 2013
@@ -699,7 +699,9 @@ sub compare
}
- if (defined $testCmd->{'check_job_created'} || defined $testCmd->{'check_job_complete'}) {
+ if ( (defined $testCmd->{'check_job_created'})
+ || (defined $testCmd->{'check_job_complete'})
+ || (defined $testCmd->{'check_job_exit_value'}) ) {
my $jobid = $json_hash->{'id'};
if (!defined $jobid) {
print $log "$0::$subName INFO check failed: "
@@ -714,7 +716,7 @@ sub compare
. "jobresult not defined ";
$result = 0;
}
- if ($testCmd->{'check_job_complete'}) {
+ if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'})) {
my $jobComplete;
my $NUM_RETRIES = 60;
my $SLEEP_BETWEEN_RETRIES = 5;
@@ -736,11 +738,21 @@ sub compare
$result = 0;
} else {
# job has completed, check the runState value
- my $runState = $res_hash->{'status'}->{'runState'};
- my $runStateVal = $self->getRunStateNum($testCmd->{'check_job_complete'});
- if ( (!defined $runState) || $runState ne $runStateVal) {
- print $log "check_job_complete failed. got runState $runState, expected $runStateVal";
- $result = 0;
+ if (defined($testCmd->{'check_job_complete'})) {
+ my $runState = $res_hash->{'status'}->{'runState'};
+ my $runStateVal = $self->getRunStateNum($testCmd->{'check_job_complete'});
+ if ( (!defined $runState) || $runState ne $runStateVal) {
+ print $log "check_job_complete failed. got runState $runState, expected $runStateVal";
+ $result = 0;
+ }
+ }
+ if (defined($testCmd->{'check_job_exit_value'})) {
+ my $exitValue = $res_hash->{'exitValue'};
+ my $expectedExitValue = $testCmd->{'check_job_exit_value'};
+ if ( (!defined $exitValue) || $exitValue ne $expectedExitValue) {
+ print $log "check_job_exit_value failed. got exitValue $exitValue, expected $expectedExitValue";
+ $result = 0;
+ }
}
}
}
Modified: hive/branches/vectorization/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf (original)
+++ hive/branches/vectorization/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf Thu May 9 04:38:38 2013
@@ -49,6 +49,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
{
@@ -107,6 +108,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
]
@@ -128,6 +130,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'FAILURE',
+ 'check_job_exit_value' => 8,
'check_call_back' => 1,
},
{
@@ -140,7 +143,9 @@ $cfg =
#results
'status_code' => 200,
'check_job_created' => 1,
+ 'check_job_exit_value' => 0,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
{
@@ -154,6 +159,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
{
@@ -167,6 +173,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -181,6 +188,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -197,6 +205,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -212,6 +221,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -257,6 +267,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'FAILURE',
+ 'check_job_exit_value' => 11,
},
@@ -271,6 +282,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -285,6 +297,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
{
@@ -298,6 +311,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -312,6 +326,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -326,6 +341,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -340,6 +356,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -356,6 +373,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
},
Modified: hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java (original)
+++ hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java Thu May 9 04:38:38 2013
@@ -64,11 +64,14 @@ public class HiveAlterHandler implements
throw new InvalidOperationException("New table is invalid: " + newt);
}
- if (!MetaStoreUtils.validateName(newt.getTableName())
- || !MetaStoreUtils.validateTblColumns(newt.getSd().getCols())) {
+ if (!MetaStoreUtils.validateName(newt.getTableName())) {
throw new InvalidOperationException(newt.getTableName()
+ " is not a valid object name");
}
+ String validate = MetaStoreUtils.validateTblColumns(newt.getSd().getCols());
+ if (validate != null) {
+ throw new InvalidOperationException("Invalid column " + validate);
+ }
Path srcPath = null;
FileSystem srcFs = null;
Modified: hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Thu May 9 04:38:38 2013
@@ -80,6 +80,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo;
import org.apache.hadoop.hive.metastore.api.Role;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
import org.apache.hadoop.hive.metastore.api.Type;
@@ -1003,20 +1004,32 @@ public class HiveMetaStore extends Thrif
throws AlreadyExistsException, MetaException,
InvalidObjectException, NoSuchObjectException {
- if (!MetaStoreUtils.validateName(tbl.getTableName())
- || !MetaStoreUtils.validateTblColumns(tbl.getSd().getCols())
- || (tbl.getPartitionKeys() != null && !MetaStoreUtils
- .validateTblColumns(tbl.getPartitionKeys()))
- || !MetaStoreUtils.validateSkewedColNames(
- (null == tbl.getSd().getSkewedInfo()) ?
- null : tbl.getSd().getSkewedInfo().getSkewedColNames())
- || !MetaStoreUtils.validateSkewedColNamesSubsetCol(
- (null == tbl.getSd().getSkewedInfo()) ?
- null : tbl.getSd().getSkewedInfo().getSkewedColNames(),
- tbl.getSd().getCols())) {
+ if (!MetaStoreUtils.validateName(tbl.getTableName())) {
throw new InvalidObjectException(tbl.getTableName()
+ " is not a valid object name");
}
+ String validate = MetaStoreUtils.validateTblColumns(tbl.getSd().getCols());
+ if (validate != null) {
+ throw new InvalidObjectException("Invalid column " + validate);
+ }
+ if (tbl.getPartitionKeys() != null) {
+ validate = MetaStoreUtils.validateTblColumns(tbl.getPartitionKeys());
+ if (validate != null) {
+ throw new InvalidObjectException("Invalid partition column " + validate);
+ }
+ }
+ SkewedInfo skew = tbl.getSd().getSkewedInfo();
+ if (skew != null) {
+ validate = MetaStoreUtils.validateSkewedColNames(skew.getSkewedColNames());
+ if (validate != null) {
+ throw new InvalidObjectException("Invalid skew column " + validate);
+ }
+ validate = MetaStoreUtils.validateSkewedColNamesSubsetCol(
+ skew.getSkewedColNames(), tbl.getSd().getCols());
+ if (validate != null) {
+ throw new InvalidObjectException("Invalid skew column " + validate);
+ }
+ }
Path tblPath = null;
boolean success = false, madeDir = false;
Modified: hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (original)
+++ hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java Thu May 9 04:38:38 2013
@@ -352,16 +352,16 @@ public class MetaStoreUtils {
return false;
}
- static public boolean validateTblColumns(List<FieldSchema> cols) {
+ static public String validateTblColumns(List<FieldSchema> cols) {
for (FieldSchema fieldSchema : cols) {
if (!validateName(fieldSchema.getName())) {
- return false;
+ return "name: " + fieldSchema.getName();
}
if (!validateColumnType(fieldSchema.getType())) {
- return false;
+ return "type: " + fieldSchema.getType();
}
}
- return true;
+ return null;
}
static void throwExceptionIfIncompatibleColTypeChange(
@@ -434,22 +434,22 @@ public class MetaStoreUtils {
return true;
}
- public static boolean validateSkewedColNames(List<String> cols) {
+ public static String validateSkewedColNames(List<String> cols) {
if (null == cols) {
- return true;
+ return null;
}
for (String col : cols) {
if (!validateName(col)) {
- return false;
+ return col;
}
}
- return true;
+ return null;
}
- public static boolean validateSkewedColNamesSubsetCol(List<String> skewedColNames,
+ public static String validateSkewedColNamesSubsetCol(List<String> skewedColNames,
List<FieldSchema> cols) {
if (null == skewedColNames) {
- return true;
+ return null;
}
List<String> colNames = new ArrayList<String>();
for (FieldSchema fieldSchema : cols) {
@@ -459,7 +459,10 @@ public class MetaStoreUtils {
List<String> copySkewedColNames = new ArrayList<String>(skewedColNames);
// remove valid columns
copySkewedColNames.removeAll(colNames);
- return (copySkewedColNames.size() > 0) ? false : true;
+ if (copySkewedColNames.isEmpty()) {
+ return null;
+ }
+ return copySkewedColNames.toString();
}
public static String getListType(String t) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Thu May 9 04:38:38 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.QueryPl
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
@@ -532,11 +533,14 @@ public class HiveHistory {
return null;
}
+
+ public void closeStream() {
+ IOUtils.cleanup(LOG, histStream);
+ }
+
@Override
public void finalize() throws Throwable {
- if (histStream !=null){
- histStream.close();
- }
+ closeStream();
super.finalize();
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Thu May 9 04:38:38 2013
@@ -140,7 +140,200 @@ import org.apache.hadoop.util.Reflection
* </ul>
* </li>
* </ul>
+ * <p>
+ * <pre>
+ * {@code
+ * The following is a pseudo-BNF grammar for RCFile. Comments are prefixed
+ * with dashes:
*
+ * rcfile ::=
+ * <file-header>
+ * <rcfile-rowgroup>+
+ *
+ * file-header ::=
+ * <file-version-header>
+ * <file-key-class-name> (only exists if version is seq6)
+ * <file-value-class-name> (only exists if version is seq6)
+ * <file-is-compressed>
+ * <file-is-block-compressed> (only exists if version is seq6)
+ * [<file-compression-codec-class>]
+ * <file-header-metadata>
+ * <file-sync-field>
+ *
+ * -- The normative RCFile implementation included with Hive is actually
+ * -- based on a modified version of Hadoop's SequenceFile code. Some
+ * -- things which should have been modified were not, including the code
+ * -- that writes out the file version header. Consequently, RCFile and
+ * -- SequenceFile originally shared the same version header. A newer
+ * -- release has created a unique version string.
+ *
+ * file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
+ * | Byte[4] {'R', 'C', 'F', 1}
+ *
+ * -- The name of the Java class responsible for reading the key buffer
+ * -- component of the rowgroup.
+ *
+ * file-key-class-name ::=
+ * Text {"org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"}
+ *
+ * -- The name of the Java class responsible for reading the value buffer
+ * -- component of the rowgroup.
+ *
+ * file-value-class-name ::=
+ * Text {"org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer"}
+ *
+ * -- Boolean variable indicating whether or not the file uses compression
+ * -- for the key and column buffer sections.
+ *
+ * file-is-compressed ::= Byte[1]
+ *
+ * -- A boolean field indicating whether or not the file is block compressed.
+ * -- This field is *always* false. According to comments in the original
+ * -- RCFile implementation this field was retained for backwards
+ * -- compatability with the SequenceFile format.
+ *
+ * file-is-block-compressed ::= Byte[1] {false}
+ *
+ * -- The Java class name of the compression codec iff <file-is-compressed>
+ * -- is true. The named class must implement
+ * -- org.apache.hadoop.io.compress.CompressionCodec.
+ * -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
+ *
+ * file-compression-codec-class ::= Text
+ *
+ * -- A collection of key-value pairs defining metadata values for the
+ * -- file. The Map is serialized using standard JDK serialization, i.e.
+ * -- an Int corresponding to the number of key-value pairs, followed by
+ * -- Text key and value pairs. The following metadata properties are
+ * -- mandatory for all RCFiles:
+ * --
+ * -- hive.io.rcfile.column.number: the number of columns in the RCFile
+ *
+ * file-header-metadata ::= Map<Text, Text>
+ *
+ * -- A 16 byte marker that is generated by the writer. This marker appears
+ * -- at regular intervals at the beginning of rowgroup-headers, and is
+ * -- intended to enable readers to skip over corrupted rowgroups.
+ *
+ * file-sync-hash ::= Byte[16]
+ *
+ * -- Each row group is split into three sections: a header, a set of
+ * -- key buffers, and a set of column buffers. The header section includes
+ * -- an optional sync hash, information about the size of the row group, and
+ * -- the total number of rows in the row group. Each key buffer
+ * -- consists of run-length encoding data which is used to decode
+ * -- the length and offsets of individual fields in the corresponding column
+ * -- buffer.
+ *
+ * rcfile-rowgroup ::=
+ * <rowgroup-header>
+ * <rowgroup-key-data>
+ * <rowgroup-column-buffers>
+ *
+ * rowgroup-header ::=
+ * [<rowgroup-sync-marker>, <rowgroup-sync-hash>]
+ * <rowgroup-record-length>
+ * <rowgroup-key-length>
+ * <rowgroup-compressed-key-length>
+ *
+ * -- rowgroup-key-data is compressed if the column data is compressed.
+ * rowgroup-key-data ::=
+ * <rowgroup-num-rows>
+ * <rowgroup-key-buffers>
+ *
+ * -- An integer (always -1) signaling the beginning of a sync-hash
+ * -- field.
+ *
+ * rowgroup-sync-marker ::= Int
+ *
+ * -- A 16 byte sync field. This must match the <file-sync-hash> value read
+ * -- in the file header.
+ *
+ * rowgroup-sync-hash ::= Byte[16]
+ *
+ * -- The record-length is the sum of the number of bytes used to store
+ * -- the key and column parts, i.e. it is the total length of the current
+ * -- rowgroup.
+ *
+ * rowgroup-record-length ::= Int
+ *
+ * -- Total length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-key-length ::= Int
+ *
+ * -- Total compressed length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-compressed-key-length ::= Int
+ *
+ * -- Number of rows in the current rowgroup.
+ *
+ * rowgroup-num-rows ::= VInt
+ *
+ * -- One or more column key buffers corresponding to each column
+ * -- in the RCFile.
+ *
+ * rowgroup-key-buffers ::= <rowgroup-key-buffer>+
+ *
+ * -- Data in each column buffer is stored using a run-length
+ * -- encoding scheme that is intended to reduce the cost of
+ * -- repeated column field values. This mechanism is described
+ * -- in more detail in the following entries.
+ *
+ * rowgroup-key-buffer ::=
+ * <column-buffer-length>
+ * <column-buffer-uncompressed-length>
+ * <column-key-buffer-length>
+ * <column-key-buffer>
+ *
+ * -- The serialized length on disk of the corresponding column buffer.
+ *
+ * column-buffer-length ::= VInt
+ *
+ * -- The uncompressed length of the corresponding column buffer. This
+ * -- is equivalent to column-buffer-length if the RCFile is not compressed.
+ *
+ * column-buffer-uncompressed-length ::= VInt
+ *
+ * -- The length in bytes of the current column key buffer
+ *
+ * column-key-buffer-length ::= VInt
+ *
+ * -- The column-key-buffer contains a sequence of serialized VInt values
+ * -- corresponding to the byte lengths of the serialized column fields
+ * -- in the corresponding rowgroup-column-buffer. For example, consider
+ * -- an integer column that contains the consecutive values 1, 2, 3, 44.
+ * -- The RCFile format stores these values as strings in the column buffer,
+ * -- e.g. "12344". The length of each column field is recorded in
+ * -- the column-key-buffer as a sequence of VInts: 1,1,1,2. However,
+ * -- if the same length occurs repeatedly, then we replace repeated
+ * -- run lengths with the complement (i.e. negative) of the number of
+ * -- repetitions, so 1,1,1,2 becomes 1,~2,2.
+ *
+ * column-key-buffer ::= Byte[column-key-buffer-length]
+ *
+ * rowgroup-column-buffers ::= <rowgroup-value-buffer>+
+ *
+ * -- RCFile stores all column data as strings regardless of the
+ * -- underlying column type. The strings are neither length-prefixed or
+ * -- null-terminated, and decoding them into individual fields requires
+ * -- the use of the run-length information contained in the corresponding
+ * -- column-key-buffer.
+ *
+ * rowgroup-column-buffer ::= Byte[column-buffer-length]
+ *
+ * Byte ::= An eight-bit byte
+ *
+ * VInt ::= Variable length integer. The high-order bit of each byte
+ * indicates whether more bytes remain to be read. The low-order seven
+ * bits are appended as increasingly more significant bits in the
+ * resulting integer value.
+ *
+ * Int ::= A four-byte integer in big-endian format.
+ *
+ * Text ::= VInt, Chars (Length prefixed UTF-8 characters)
+ * }
+ * </pre>
+ * </p>
*/
public class RCFile {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java Thu May 9 04:38:38 2013
@@ -103,7 +103,7 @@ final class DynamicByteArray {
* @param value the array to copy from
* @param valueOffset the first location to copy from value
* @param valueLength the number of bytes to copy from value
- * @return
+ * @return the offset of the start of the value
*/
public int add(byte[] value, int valueOffset, int valueLength) {
int i = length / chunkSize;
@@ -293,4 +293,11 @@ final class DynamicByteArray {
}
return result;
}
+
+ /**
+ * Get the size of the buffers.
+ */
+ public long getSizeInBytes() {
+ return initializedChunks * chunkSize;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java Thu May 9 04:38:38 2013
@@ -135,5 +135,8 @@ final class DynamicIntArray {
return sb.toString();
}
+ public int getSizeInBytes() {
+ return 4 * initializedChunks * chunkSize;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java Thu May 9 04:38:38 2013
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -33,19 +35,25 @@ import java.util.Map;
* dynamic partitions, it is easy to end up with many writers in the same task.
* By managing the size of each allocation, we try to cut down the size of each
* allocation and keep the task from running out of memory.
+ *
+ * This class is thread safe and uses synchronization around the shared state
+ * to prevent race conditions.
*/
class MemoryManager {
+
+ private static final Log LOG = LogFactory.getLog(MemoryManager.class);
+
/**
- * How much does the pool need to change between notifications?
+ * How often should we check the memory sizes? Measured in rows added
+ * to all of the writers.
*/
- private static final double NOTIFICATION_FACTOR = 1.1;
+ private static final int ROWS_BETWEEN_CHECKS = 5000;
private final long totalMemoryPool;
- private long notificationTrigger;
private final Map<Path, WriterInfo> writerList =
new HashMap<Path, WriterInfo>();
private long totalAllocation = 0;
private double currentScale = 1;
- private double lastNotificationScale = 1;
+ private int rowsAddedSinceCheck = 0;
private static class WriterInfo {
long allocation;
@@ -57,7 +65,13 @@ class MemoryManager {
}
public interface Callback {
- void checkMemory(double newScale) throws IOException;
+ /**
+ * The writer needs to check its memory usage
+ * @param newScale the current scale factor for memory allocations
+ * @return true if the writer was over the limit
+ * @throws IOException
+ */
+ boolean checkMemory(double newScale) throws IOException;
}
/**
@@ -70,22 +84,26 @@ class MemoryManager {
double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal);
totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
getHeapMemoryUsage().getMax() * maxLoad);
- notificationTrigger = Math.round(totalMemoryPool * NOTIFICATION_FACTOR);
}
/**
- * Add a new writer's memory allocation to the pool
+ * Add a new writer's memory allocation to the pool. We use the path
+ * as a unique key to ensure that we don't get duplicates.
* @param path the file that is being written
* @param requestedAllocation the requested buffer size
*/
synchronized void addWriter(Path path, long requestedAllocation,
Callback callback) throws IOException {
WriterInfo oldVal = writerList.get(path);
+ // this should always be null, but we handle the case where the memory
+ // manager wasn't told that a writer wasn't still in use and the task
+ // starts writing to the same path.
if (oldVal == null) {
oldVal = new WriterInfo(requestedAllocation, callback);
writerList.put(path, oldVal);
totalAllocation += requestedAllocation;
} else {
+ // handle a new writer that is writing to the same path
totalAllocation += requestedAllocation - oldVal.allocation;
oldVal.allocation = requestedAllocation;
oldVal.callback = callback;
@@ -125,6 +143,31 @@ class MemoryManager {
}
/**
+ * Give the memory manager an opportunity for doing a memory check.
+ * @throws IOException
+ */
+ synchronized void addedRow() throws IOException {
+ if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
+ notifyWriters();
+ }
+ }
+
+ /**
+ * Notify all of the writers that they should check their memory usage.
+ * @throws IOException
+ */
+ private void notifyWriters() throws IOException {
+ LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
+ for(WriterInfo writer: writerList.values()) {
+ boolean flushed = writer.callback.checkMemory(currentScale);
+ if (LOG.isDebugEnabled() && flushed) {
+ LOG.debug("flushed " + writer.toString());
+ }
+ }
+ rowsAddedSinceCheck = 0;
+ }
+
+ /**
* Update the currentScale based on the current allocation and pool size.
* This also updates the notificationTrigger.
* @param isAllocate is this an allocation?
@@ -135,21 +178,5 @@ class MemoryManager {
} else {
currentScale = (double) totalMemoryPool / totalAllocation;
}
- if (!isAllocate) {
- // ensure that we notify if we drop 10% from the high water mark
- notificationTrigger =
- Math.min(notificationTrigger,
- Math.round(totalMemoryPool * NOTIFICATION_FACTOR / currentScale));
- } else {
- // we've allocated a new writer, so check to see if we need to notify
- if (totalAllocation > notificationTrigger) {
- for(WriterInfo writer: writerList.values()) {
- writer.callback.checkMemory(currentScale);
- }
- // set the next notification trigger
- notificationTrigger =
- Math.round(totalMemoryPool * NOTIFICATION_FACTOR / currentScale);
- }
- }
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java Thu May 9 04:38:38 2013
@@ -21,12 +21,16 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
@@ -58,8 +62,18 @@ final class OrcStruct implements Writabl
return fields.length;
}
+ /**
+ * Change the number of fields in the struct. No effect if the number of
+ * fields is the same. The old field values are copied to the new array.
+ * @param numFields the new number of fields
+ */
public void setNumFields(int numFields) {
- fields = new Object[numFields];
+ if (fields.length != numFields) {
+ Object[] oldFields = fields;
+ fields = new Object[numFields];
+ System.arraycopy(oldFields, 0, fields, 0,
+ Math.min(oldFields.length, numFields));
+ }
}
@Override
@@ -148,7 +162,7 @@ final class OrcStruct implements Writabl
}
}
- static class OrcStructInspector extends StructObjectInspector {
+ static class OrcStructInspector extends SettableStructObjectInspector {
private final List<StructField> fields;
OrcStructInspector(StructTypeInfo info) {
@@ -223,9 +237,52 @@ final class OrcStruct implements Writabl
public Category getCategory() {
return Category.STRUCT;
}
+
+ @Override
+ public Object create() {
+ return new OrcStruct(0);
+ }
+
+ @Override
+ public Object setStructFieldData(Object struct, StructField field,
+ Object fieldValue) {
+ OrcStruct orcStruct = (OrcStruct) struct;
+ int offset = ((Field) field).offset;
+ // if the offset is bigger than our current number of fields, grow it
+ if (orcStruct.getNumFields() <= offset) {
+ orcStruct.setNumFields(offset+1);
+ }
+ orcStruct.setFieldValue(offset, fieldValue);
+ return struct;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || o.getClass() != getClass()) {
+ return false;
+ } else if (o == this) {
+ return true;
+ } else {
+ List<StructField> other = ((OrcStructInspector) o).fields;
+ if (other.size() != fields.size()) {
+ return false;
+ }
+ for(int i = 0; i < fields.size(); ++i) {
+ StructField left = other.get(i);
+ StructField right = fields.get(i);
+ if (!(left.getFieldName().equals(right.getFieldName()) &&
+ left.getFieldObjectInspector().equals
+ (right.getFieldObjectInspector()))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
}
- static class OrcMapObjectInspector implements MapObjectInspector {
+ static class OrcMapObjectInspector
+ implements MapObjectInspector, SettableMapObjectInspector {
private final ObjectInspector key;
private final ObjectInspector value;
@@ -275,9 +332,45 @@ final class OrcStruct implements Writabl
public Category getCategory() {
return Category.MAP;
}
+
+ @Override
+ public Object create() {
+ return new HashMap<Object,Object>();
+ }
+
+ @Override
+ public Object put(Object map, Object key, Object value) {
+ ((Map) map).put(key, value);
+ return map;
+ }
+
+ @Override
+ public Object remove(Object map, Object key) {
+ ((Map) map).remove(key);
+ return map;
+ }
+
+ @Override
+ public Object clear(Object map) {
+ ((Map) map).clear();
+ return map;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || o.getClass() != getClass()) {
+ return false;
+ } else if (o == this) {
+ return true;
+ } else {
+ OrcMapObjectInspector other = (OrcMapObjectInspector) o;
+ return other.key.equals(key) && other.value.equals(value);
+ }
+ }
}
- static class OrcListObjectInspector implements ListObjectInspector {
+ static class OrcListObjectInspector
+ implements ListObjectInspector, SettableListObjectInspector {
private final ObjectInspector child;
OrcListObjectInspector(ListTypeInfo info) {
@@ -319,6 +412,43 @@ final class OrcStruct implements Writabl
public Category getCategory() {
return Category.LIST;
}
+
+ @Override
+ public Object create(int size) {
+ ArrayList<Object> result = new ArrayList<Object>(size);
+ for(int i = 0; i < size; ++i) {
+ result.add(null);
+ }
+ return result;
+ }
+
+ @Override
+ public Object set(Object list, int index, Object element) {
+ List l = (List) list;
+ for(int i=l.size(); i < index+1; ++i) {
+ l.add(null);
+ }
+ l.set(index, element);
+ return list;
+ }
+
+ @Override
+ public Object resize(Object list, int newSize) {
+ ((ArrayList) list).ensureCapacity(newSize);
+ return list;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || o.getClass() != getClass()) {
+ return false;
+ } else if (o == this) {
+ return true;
+ } else {
+ ObjectInspector other = ((OrcListObjectInspector) o).child;
+ return other.equals(child);
+ }
+ }
}
static ObjectInspector createObjectInspector(TypeInfo info) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java Thu May 9 04:38:38 2013
@@ -134,5 +134,25 @@ final class OrcUnion implements UnionObj
public Category getCategory() {
return Category.UNION;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || o.getClass() != getClass()) {
+ return false;
+ } else if (o == this) {
+ return true;
+ } else {
+ List<ObjectInspector> other = ((OrcUnionObjectInspector) o).children;
+ if (other.size() != children.size()) {
+ return false;
+ }
+ for(int i = 0; i < children.size(); ++i) {
+ if (!other.get(i).equals(children.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java Thu May 9 04:38:38 2013
@@ -23,15 +23,40 @@ import java.nio.ByteBuffer;
class OutStream extends PositionedOutputStream {
interface OutputReceiver {
+ /**
+ * Output the given buffer to the final destination
+ * @param buffer the buffer to output
+ * @throws IOException
+ */
void output(ByteBuffer buffer) throws IOException;
}
static final int HEADER_SIZE = 3;
private final String name;
private final OutputReceiver receiver;
+
+ /**
+ * Stores the uncompressed bytes that have been serialized, but not
+ * compressed yet. When this fills, we compress the entire buffer.
+ */
+ private ByteBuffer current = null;
+
+ /**
+ * Stores the compressed bytes until we have a full buffer and then outputs
+ * them to the receiver. If no compression is being done, this (and overflow)
+ * will always be null and the current buffer will be sent directly to the
+ * receiver.
+ */
private ByteBuffer compressed = null;
+
+ /**
+ * Since the compressed buffer may start with contents from previous
+ * compression blocks, we allocate an overflow buffer so that the
+ * output of the codec can be split between the two buffers. After the
+ * compressed buffer is sent to the receiver, the overflow buffer becomes
+ * the new compressed buffer.
+ */
private ByteBuffer overflow = null;
- private ByteBuffer current;
private final int bufferSize;
private final CompressionCodec codec;
private long compressedBytes = 0;
@@ -85,9 +110,11 @@ class OutStream extends PositionedOutput
}
}
+ /**
+ * Allocate a new output buffer if we are compressing.
+ */
private ByteBuffer getNewOutputBuffer() throws IOException {
- return ByteBuffer.allocate(bufferSize +
- (codec == null ? 0 : HEADER_SIZE));
+ return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
}
private void flip() throws IOException {
@@ -128,7 +155,8 @@ class OutStream extends PositionedOutput
private void spill() throws java.io.IOException {
// if there isn't anything in the current buffer, don't spill
- if (current == null || current.position() == (codec == null ? 0 : HEADER_SIZE)) {
+ if (current == null ||
+ current.position() == (codec == null ? 0 : HEADER_SIZE)) {
return;
}
flip();
@@ -223,8 +251,18 @@ class OutStream extends PositionedOutput
}
@Override
- public long getSize() {
- return uncompressedBytes + compressedBytes;
+ public long getBufferSize() {
+ long result = 0;
+ if (current != null) {
+ result += current.capacity();
+ }
+ if (compressed != null) {
+ result += compressed.capacity();
+ }
+ if (overflow != null) {
+ result += overflow.capacity();
+ }
+ return result;
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java Thu May 9 04:38:38 2013
@@ -21,6 +21,18 @@ import java.io.IOException;
import java.io.OutputStream;
abstract class PositionedOutputStream extends OutputStream {
+
+ /**
+ * Record the current position to the recorder.
+ * @param recorder the object that receives the position
+ * @throws IOException
+ */
abstract void getPosition(PositionRecorder recorder) throws IOException;
- abstract long getSize();
+
+ /**
+ * Get the memory size currently allocated as buffer associated with this
+ * stream.
+ * @return the number of bytes used by buffers.
+ */
+ abstract long getBufferSize();
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java Thu May 9 04:38:38 2013
@@ -25,13 +25,11 @@ package org.apache.hadoop.hive.ql.io.orc
*/
abstract class RedBlackTree {
public static final int NULL = -1;
- private static final int DEFAULT_INITIAL_CAPACITY = 16 * 1024;
// Various values controlling the offset of the data within the array.
private static final int LEFT_OFFSET = 0;
private static final int RIGHT_OFFSET = 1;
- private static final int COUNT_OFFSET = 2;
- private static final int ELEMENT_SIZE = 3;
+ private static final int ELEMENT_SIZE = 2;
protected int size = 0;
private final DynamicIntArray data;
@@ -40,13 +38,6 @@ abstract class RedBlackTree {
private boolean wasAdd = false;
/**
- * Create a set with a default initial capacity.
- */
- public RedBlackTree() {
- data = new DynamicIntArray(DEFAULT_INITIAL_CAPACITY * ELEMENT_SIZE);
- }
-
- /**
* Create a set with the given initial capacity.
*/
public RedBlackTree(int initialCapacity) {
@@ -63,7 +54,6 @@ abstract class RedBlackTree {
size += 1;
setLeft(position, left, isRed);
setRight(position, right);
- setCount(position, 1);
return position;
}
@@ -109,18 +99,6 @@ abstract class RedBlackTree {
return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET);
}
- protected int getCount(int position) {
- return data.get(position * ELEMENT_SIZE + COUNT_OFFSET);
- }
-
- private void setCount(int position, int value) {
- data.set(position * ELEMENT_SIZE + COUNT_OFFSET, value);
- }
-
- private void incrementCount(int position, int value) {
- data.increment(position * ELEMENT_SIZE + COUNT_OFFSET, value);
- }
-
/**
* Set the left field of the given position.
* Note that we are storing the node color in the low bit of the left pointer.
@@ -200,7 +178,6 @@ abstract class RedBlackTree {
} else {
lastAdd = node;
wasAdd = false;
- incrementCount(node, 1);
return false;
}
@@ -322,5 +299,11 @@ abstract class RedBlackTree {
data.clear();
}
+ /**
+ * Get the buffer size in bytes.
+ */
+ public long getSizeInBytes() {
+ return data.getSizeInBytes();
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java Thu May 9 04:38:38 2013
@@ -24,19 +24,16 @@ import java.io.OutputStream;
/**
* A red-black tree that stores strings. The strings are stored as UTF-8 bytes
- * and an offset/length for each entry.
+ * and an offset for each entry.
*/
class StringRedBlackTree extends RedBlackTree {
private final DynamicByteArray byteArray = new DynamicByteArray();
- private final DynamicIntArray keySizes = new DynamicIntArray();
+ private final DynamicIntArray keyOffsets;
private final Text newKey = new Text();
- public StringRedBlackTree() {
- // PASS
- }
-
public StringRedBlackTree(int initialCapacity) {
super(initialCapacity);
+ keyOffsets = new DynamicIntArray(initialCapacity);
}
public int add(String value) {
@@ -44,16 +41,22 @@ class StringRedBlackTree extends RedBlac
// if the key is new, add it to our byteArray and store the offset & length
if (add()) {
int len = newKey.getLength();
- keySizes.add(byteArray.add(newKey.getBytes(), 0, len));
- keySizes.add(len);
+ keyOffsets.add(byteArray.add(newKey.getBytes(), 0, len));
}
return lastAdd;
}
@Override
protected int compareValue(int position) {
+ int start = keyOffsets.get(position);
+ int end;
+ if (position + 1 == keyOffsets.size()) {
+ end = byteArray.size();
+ } else {
+ end = keyOffsets.get(position+1);
+ }
return byteArray.compare(newKey.getBytes(), 0, newKey.getLength(),
- keySizes.get(2 * position), keySizes.get(2 * position + 1));
+ start, end - start);
}
/**
@@ -84,12 +87,6 @@ class StringRedBlackTree extends RedBlac
* @return the string's length in bytes
*/
int getLength();
-
- /**
- * Get the count for this key.
- * @return the number of times this key was added
- */
- int getCount();
}
/**
@@ -106,6 +103,8 @@ class StringRedBlackTree extends RedBlac
private class VisitorContextImpl implements VisitorContext {
private int originalPosition;
+ private int start;
+ private int end;
private final Text text = new Text();
public int getOriginalPosition() {
@@ -113,20 +112,26 @@ class StringRedBlackTree extends RedBlac
}
public Text getText() {
- byteArray.setText(text, keySizes.get(originalPosition * 2), getLength());
+ byteArray.setText(text, start, end - start);
return text;
}
public void writeBytes(OutputStream out) throws IOException {
- byteArray.write(out, keySizes.get(originalPosition * 2), getLength());
+ byteArray.write(out, start, end - start);
}
public int getLength() {
- return keySizes.get(originalPosition * 2 + 1);
+ return end - start;
}
- public int getCount() {
- return StringRedBlackTree.this.getCount(originalPosition);
+ void setPosition(int position) {
+ originalPosition = position;
+ start = keyOffsets.get(originalPosition);
+ if (position + 1 == keyOffsets.size()) {
+ end = byteArray.size();
+ } else {
+ end = keyOffsets.get(originalPosition + 1);
+ }
}
}
@@ -134,7 +139,7 @@ class StringRedBlackTree extends RedBlac
) throws IOException {
if (node != NULL) {
recurse(getLeft(node), visitor, context);
- context.originalPosition = node;
+ context.setPosition(node);
visitor.visit(context);
recurse(getRight(node), visitor, context);
}
@@ -155,7 +160,7 @@ class StringRedBlackTree extends RedBlac
public void clear() {
super.clear();
byteArray.clear();
- keySizes.clear();
+ keyOffsets.clear();
}
/**
@@ -170,7 +175,8 @@ class StringRedBlackTree extends RedBlac
* Calculate the approximate size in memory.
* @return the number of bytes used in storing the tree.
*/
- public long getByteSize() {
- return byteArray.size() + 5 * 4 * size();
+ public long getSizeInBytes() {
+ return byteArray.getSizeInBytes() + keyOffsets.getSizeInBytes() +
+ super.getSizeInBytes();
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Thu May 9 04:38:38 2013
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -62,9 +64,15 @@ import com.google.protobuf.CodedOutputSt
* type of column. TreeWriters may have children TreeWriters that handle the
* sub-types. Each of the TreeWriters writes the column's data as a set of
* streams.
+ *
+ * This class is synchronized so that multi-threaded access is ok. In
+ * particular, because the MemoryManager is shared between writers, this class
+ * assumes that checkMemory may be called from a separate thread.
*/
class WriterImpl implements Writer, MemoryManager.Callback {
+ private static final Log LOG = LogFactory.getLog(WriterImpl.class);
+
private static final int HDFS_BUFFER_SIZE = 256 * 1024;
private static final int MIN_ROW_INDEX_STRIDE = 1000;
@@ -154,10 +162,18 @@ class WriterImpl implements Writer, Memo
}
@Override
- public void checkMemory(double newScale) throws IOException {
- if (estimateStripeSize() > Math.round(stripeSize * newScale)) {
- flushStripe();
+ public synchronized boolean checkMemory(double newScale) throws IOException {
+ long limit = (long) Math.round(stripeSize * newScale);
+ long size = estimateStripeSize();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
+ limit);
+ }
+ if (size > limit) {
+ flushStripe();
+ return true;
}
+ return false;
}
/**
@@ -186,6 +202,18 @@ class WriterImpl implements Writer, Memo
}
/**
+ * Get the number of bytes in buffers that are allocated to this stream.
+ * @return number of bytes in buffers
+ */
+ public long getBufferSize() {
+ long result = 0;
+ for(ByteBuffer buf: output) {
+ result += buf.capacity();
+ }
+ return outStream.getBufferSize() + result;
+ }
+
+ /**
* Flush the stream to the codec.
* @throws IOException
*/
@@ -214,12 +242,9 @@ class WriterImpl implements Writer, Memo
}
}
- /**
- * Get the size of compressed and uncompressed data in the stream's buffers.
- * @return the number of bytes in the buffers.
- */
- long getSize() {
- return outStream.getSize();
+ @Override
+ public String toString() {
+ return outStream.toString();
}
}
@@ -681,11 +706,12 @@ class WriterImpl implements Writer, Memo
}
private static class StringTreeWriter extends TreeWriter {
+ private static final int INITIAL_DICTIONARY_SIZE = 4096;
private final PositionedOutputStream stringOutput;
private final RunLengthIntegerWriter lengthOutput;
private final RunLengthIntegerWriter rowOutput;
- private final RunLengthIntegerWriter countOutput;
- private final StringRedBlackTree dictionary = new StringRedBlackTree();
+ private final StringRedBlackTree dictionary =
+ new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
private final DynamicIntArray rows = new DynamicIntArray();
private final List<OrcProto.RowIndexEntry> savedRowIndex =
new ArrayList<OrcProto.RowIndexEntry>();
@@ -703,12 +729,6 @@ class WriterImpl implements Writer, Memo
OrcProto.Stream.Kind.LENGTH), false);
rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
OrcProto.Stream.Kind.DATA), false);
- if (writer.buildIndex()) {
- countOutput = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DICTIONARY_COUNT), false);
- } else {
- countOutput = null;
- }
recordPosition(rowIndexPosition);
rowIndexValueCount.add(0L);
buildIndex = writer.buildIndex();
@@ -739,9 +759,6 @@ class WriterImpl implements Writer, Memo
context.writeBytes(stringOutput);
lengthOutput.write(context.getLength());
dumpOrder[context.getOriginalPosition()] = currentId++;
- if (countOutput != null) {
- countOutput.write(context.getCount());
- }
}
});
int length = rows.size();
@@ -770,9 +787,6 @@ class WriterImpl implements Writer, Memo
stringOutput.flush();
lengthOutput.flush();
rowOutput.flush();
- if (countOutput != null) {
- countOutput.flush();
- }
// reset all of the fields to be ready for the next stripe.
dictionary.clear();
rows.clear();
@@ -809,7 +823,7 @@ class WriterImpl implements Writer, Memo
@Override
long estimateMemory() {
- return rows.size() * 4 + dictionary.getByteSize();
+ return rows.getSizeInBytes() + dictionary.getSizeInBytes();
}
}
@@ -1434,40 +1448,43 @@ class WriterImpl implements Writer, Memo
private long estimateStripeSize() {
long result = 0;
for(BufferedStream stream: streams.values()) {
- result += stream.getSize();
+ result += stream.getBufferSize();
}
result += treeWriter.estimateMemory();
return result;
}
@Override
- public void addUserMetadata(String name, ByteBuffer value) {
+ public synchronized void addUserMetadata(String name, ByteBuffer value) {
userMetadata.put(name, ByteString.copyFrom(value));
}
@Override
public void addRow(Object row) throws IOException {
- treeWriter.write(row);
- rowsInStripe += 1;
- if (buildIndex) {
- rowsInIndex += 1;
-
- if (rowsInIndex >= rowIndexStride) {
- createRowIndexEntry();
+ synchronized (this) {
+ treeWriter.write(row);
+ rowsInStripe += 1;
+ if (buildIndex) {
+ rowsInIndex += 1;
+
+ if (rowsInIndex >= rowIndexStride) {
+ createRowIndexEntry();
+ }
}
}
- // once every 1000 rows, check the size to see if we should spill
- if (rowsInStripe % 1000 == 0) {
- checkMemory(memoryManager.getAllocationScale());
- }
+ memoryManager.addedRow();
}
@Override
public void close() throws IOException {
- flushStripe();
- int footerLength = writeFooter(rawWriter.getPos());
- rawWriter.writeByte(writePostScript(footerLength));
- rawWriter.close();
+ // remove us from the memory manager so that we don't get any callbacks
memoryManager.removeWriter(path);
+ // actually close the file
+ synchronized (this) {
+ flushStripe();
+ int footerLength = writeFooter(rawWriter.getPos());
+ rawWriter.writeByte(writePostScript(footerLength));
+ rawWriter.close();
+ }
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessAnalyzer.java Thu May 9 04:38:38 2013
@@ -44,7 +44,7 @@ public class ColumnAccessAnalyzer {
for (TableScanOperator op : topOps.keySet()) {
Table table = topOps.get(op);
String tableName = table.getCompleteName();
- List<FieldSchema> tableCols = table.getAllCols();
+ List<FieldSchema> tableCols = table.getCols();
for (int i : op.getNeededColumnIDs()) {
columnAccessInfo.add(tableName, tableCols.get(i).getName());
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java Thu May 9 04:38:38 2013
@@ -1050,8 +1050,7 @@ public class PTFTranslator {
}
ColumnInfo cInfo = new ColumnInfo(wFnDef.getAlias(),
TypeInfoUtils.getTypeInfoFromObjectInspector(wFnOI),
- null,
- false);
+ null, true, true);
rr.putExpression(ast, cInfo);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu May 9 04:38:38 2013
@@ -5167,10 +5167,11 @@ public class SemanticAnalyzer extends Ba
if (field_schemas != null) {
FieldSchema col = new FieldSchema();
- if (nm[1] != null) {
- col.setName(unescapeIdentifier(colInfo.getAlias()).toLowerCase()); // remove ``
- } else {
+ if ("".equals(nm[0]) || nm[1] == null) {
+ // ast expression is not a valid column name for table
col.setName(colInfo.getInternalName());
+ } else {
+ col.setName(unescapeIdentifier(colInfo.getAlias()).toLowerCase()); // remove ``
}
col.setType(colInfo.getType().getTypeName());
field_schemas.add(col);
@@ -7385,6 +7386,10 @@ public class SemanticAnalyzer extends Ba
for (ColumnInfo colInfo : rr.getColumnInfos()) {
String name = colInfo.getInternalName();
String[] tmp = rr.reverseLookup(name);
+ if ("".equals(tmp[0]) || tmp[1] == null) {
+ // ast expression is not a valid column name for table
+ tmp[1] = colInfo.getInternalName();
+ }
newRR.put(alias, tmp[1], colInfo);
}
opParseCtx.get(curr).setRowResolver(newRR);
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Thu May 9 04:38:38 2013
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.URI;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
@@ -41,7 +40,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.MapRedStats;
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java?rev=1480526&r1=1480525&r2=1480526&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java Thu May 9 04:38:38 2013
@@ -39,8 +39,8 @@ public class TestMemoryManager {
private static final double ERROR = 0.000001;
private static class NullCallback implements MemoryManager.Callback {
- public void checkMemory(double newScale) {
- // PASS
+ public boolean checkMemory(double newScale) {
+ return false;
}
}
@@ -120,17 +120,13 @@ public class TestMemoryManager {
calls[i] = mock(MemoryManager.Callback.class);
mgr.addWriter(new Path(Integer.toString(i)), pool/4, calls[i]);
}
- double[] spills = new double[]{0, 0, 0, 0, 0.8, 0.666666666667,
- 0.571428571429, 0.5, 0.444444444444,
- 0.4, 0, 0.333333333333, 0, 0.285714285714,
- 0, 0.25, 0, 0.222222222222, 0, 0.2};
- for(int spill=0; spill < spills.length; ++spill) {
- if (spills[spill] != 0) {
- for(int call=0; call < spill + 1; ++call) {
- verify(calls[call], times(1))
- .checkMemory(doubleThat(closeTo(spills[spill], ERROR)));
- }
- }
+ // add enough rows to get the memory manager to check the limits
+ for(int i=0; i < 10000; ++i) {
+ mgr.addedRow();
+ }
+ for(int call=0; call < calls.length; ++call) {
+ verify(calls[call], times(2))
+ .checkMemory(doubleThat(closeTo(0.2, ERROR)));
}
}
}