You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/04/14 18:04:07 UTC
svn commit: r933997 - in /hadoop/hive/trunk: ./ hbase-handler/
hbase-handler/src/java/org/apache/hadoop/hive/hbase/
hbase-handler/src/test/queries/ hbase-handler/src/test/results/ ql/
ql/src/java/org/apache/hadoop/hive/ql/io/
Author: namit
Date: Wed Apr 14 16:04:06 2010
New Revision: 933997
URL: http://svn.apache.org/viewvc?rev=933997&view=rev
Log:
HIVE-1295. facilitate HBase bulk loads from Hive
(John Sichi via namit)
Added:
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_bulk.m
hadoop/hive/trunk/hbase-handler/src/test/results/hbase_bulk.m.out
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/build-common.xml
hadoop/hive/trunk/hbase-handler/build.xml
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
hadoop/hive/trunk/ql/build.xml
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=933997&r1=933996&r2=933997&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Apr 14 16:04:06 2010
@@ -72,6 +72,9 @@ Trunk - Unreleased
HIVE-1300. alter table touch partition
(Paul Yang via namit)
+ HIVE-1295. facilitate HBase bulk loads from Hive
+ (John Sichi via namit)
+
IMPROVEMENTS
HIVE-983. Function from_unixtime takes long.
(Ning Zhang via zshao)
Modified: hadoop/hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/build-common.xml?rev=933997&r1=933996&r2=933997&view=diff
==============================================================================
--- hadoop/hive/trunk/build-common.xml (original)
+++ hadoop/hive/trunk/build-common.xml Wed Apr 14 16:04:06 2010
@@ -218,8 +218,8 @@
<pathelement location="${build.dir.hive}/cli/classes"/>
<pathelement location="${build.dir.hive}/shims/classes"/>
<pathelement location="${build.dir.hive}/hwi/classes"/>
- <fileset dir="${basedir}" includes="lib/*.jar"/>
<path refid="common-classpath"/>
+ <fileset dir="${basedir}" includes="lib/*.jar"/>
</path>
<target name="create-dirs">
Modified: hadoop/hive/trunk/hbase-handler/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/build.xml?rev=933997&r1=933996&r2=933997&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/build.xml (original)
+++ hadoop/hive/trunk/hbase-handler/build.xml Wed Apr 14 16:04:06 2010
@@ -74,7 +74,14 @@
resultsDirectory="${hbase-handler.test.results.dir}" className="TestHBaseCliDriver"
logFile="${test.log.dir}/testhbaseclidrivergen.log"
logDirectory="${test.log.dir}/hbase-handler"/>
-
+ <qtestgen outputDirectory="${test.build.src}/org/apache/hadoop/hive/cli"
+ templatePath="${ql.hbase.test.template.dir}" template="TestHBaseCliDriver.vm"
+ queryDirectory="${hbase-handler.test.query.dir}"
+ queryFile="hbase_bulk.m"
+ clusterMode="miniMR"
+ resultsDirectory="${hbase-handler.test.results.dir}" className="TestHBaseMinimrCliDriver"
+ logFile="${test.log.dir}/testhbaseminimrclidrivergen.log"
+ logDirectory="${test.log.dir}/hbase-handler"/>
</target>
<!-- override target jar because tests need to add hbase-handler jars,
Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java?rev=933997&r1=933996&r2=933997&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java Wed Apr 14 16:04:06 2010
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Progressable;
/**
- * HiveHBaseTableOutputFormat implements TableOutputFormat for HBase tables.
+ * HiveHBaseTableOutputFormat implements HiveOutputFormat for HBase tables.
*/
public class HiveHBaseTableOutputFormat extends
TableOutputFormat implements
Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java?rev=933997&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java (added)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java Wed Apr 14 16:04:06 2010
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * HiveHFileOutputFormat implements HiveOutputFormat for HFile bulk
+ * loading. Until HBASE-1861 is implemented, it can only be used
+ * for loading a table with a single column family.
+ */
+public class HiveHFileOutputFormat extends
+ HFileOutputFormat implements
+ HiveOutputFormat<ImmutableBytesWritable, KeyValue> {
+
+ private static final String HFILE_FAMILY_PATH = "hfile.family.path";
+
+ private static final Log LOG = LogFactory.getLog(
+ HiveHFileOutputFormat.class.getName());
+
+ private org.apache.hadoop.mapreduce.RecordWriter<
+ ImmutableBytesWritable, KeyValue> getFileWriter(
+ org.apache.hadoop.mapreduce.TaskAttemptContext tac) throws IOException
+ {
+ try {
+ return super.getRecordWriter(tac);
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ public RecordWriter getHiveRecordWriter(
+ final JobConf jc, final Path finalOutPath,
+ Class<? extends Writable> valueClass, boolean isCompressed,
+ Properties tableProperties,
+ final Progressable progressable) throws IOException {
+
+ // Read configuration for the target path
+ String hfilePath = tableProperties.getProperty(HFILE_FAMILY_PATH);
+ if (hfilePath == null) {
+ throw new RuntimeException(
+ "Please set " + HFILE_FAMILY_PATH + " to target location for HFiles");
+ }
+
+ // Target path's last component is also the column family name.
+ final Path columnFamilyPath = new Path(hfilePath);
+ final String columnFamilyName = columnFamilyPath.getName();
+ final byte [] columnFamilyNameBytes = Bytes.toBytes(columnFamilyName);
+ final Job job = new Job(jc);
+ setCompressOutput(job, isCompressed);
+ setOutputPath(job, finalOutPath);
+
+ // Create the HFile writer
+ org.apache.hadoop.mapreduce.TaskAttemptContext tac =
+ new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()) {
+ @Override
+ public void progress() {
+ progressable.progress();
+ }
+ };
+ final org.apache.hadoop.mapreduce.RecordWriter<
+ ImmutableBytesWritable, KeyValue> fileWriter = getFileWriter(tac);
+
+ // Individual columns are going to be pivoted to HBase cells,
+ // and for each row, they need to be written out in order
+ // of column name, so sort the column names now, creating a
+ // mapping to their column position. However, the first
+ // column is interpreted as the row key.
+ String columnList = tableProperties.getProperty("columns");
+ String [] columnArray = columnList.split(",");
+ final SortedMap<byte [], Integer> columnMap =
+ new TreeMap<byte [], Integer>(Bytes.BYTES_COMPARATOR);
+ int i = 0;
+ for (String columnName : columnArray) {
+ if (i != 0) {
+ columnMap.put(Bytes.toBytes(columnName), i);
+ }
+ ++i;
+ }
+
+ return new RecordWriter() {
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ try {
+ fileWriter.close(null);
+ if (abort) {
+ return;
+ }
+ // Move the region file(s) from the task output directory
+ // to the location specified by the user. There should
+ // actually only be one (each reducer produces one HFile),
+ // but we don't know what its name is.
+ Path outputdir = FileOutputFormat.getOutputPath(job);
+ FileSystem fs = outputdir.getFileSystem(jc);
+ fs.mkdirs(columnFamilyPath);
+ Path srcDir = new Path(outputdir, columnFamilyName);
+ for (FileStatus regionFile : fs.listStatus(srcDir)) {
+ fs.rename(
+ regionFile.getPath(),
+ new Path(
+ columnFamilyPath,
+ regionFile.getPath().getName()));
+ }
+ // Hive actually wants a file as task output (not a directory), so
+ // replace the empty directory with an empty file to keep it happy.
+ fs.delete(outputdir, true);
+ fs.createNewFile(outputdir);
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ public void write(Writable w) throws IOException {
+ // Decompose the incoming text row into fields.
+ String s = ((Text) w).toString();
+ String [] fields = s.split("\u0001");
+ assert(fields.length == (columnMap.size() + 1));
+ // First field is the row key.
+ byte [] rowKeyBytes = Bytes.toBytes(fields[0]);
+ // Remaining fields are cells addressed by column name within row.
+ for (Map.Entry<byte [], Integer> entry : columnMap.entrySet()) {
+ byte [] columnNameBytes = entry.getKey();
+ int iColumn = entry.getValue();
+ byte [] valBytes = Bytes.toBytes(fields[iColumn]);
+ KeyValue kv = new KeyValue(
+ rowKeyBytes,
+ columnFamilyNameBytes,
+ columnNameBytes,
+ valBytes);
+ try {
+ fileWriter.write(null, kv);
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+ }
+ };
+ }
+
+}
Added: hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_bulk.m
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_bulk.m?rev=933997&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_bulk.m (added)
+++ hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_bulk.m Wed Apr 14 16:04:06 2010
@@ -0,0 +1,54 @@
+drop table hbsort;
+drop table hbpartition;
+
+-- this is a dummy table used for controlling how the HFiles are
+-- created
+create table hbsort(key string, val string, val2 string)
+stored as
+INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
+TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf');
+
+-- this is a dummy table used for controlling how the input file
+-- for TotalOrderPartitioner is created
+create external table hbpartition(part_break string)
+row format serde
+'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
+stored as
+inputformat
+'org.apache.hadoop.mapred.TextInputFormat'
+outputformat
+'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
+location '/tmp/hbpartitions';
+
+-- this should produce one file in /tmp/hbpartitions, but we do not
+-- know what it will be called, so we will copy it to a well known
+-- filename /tmp/hbpartition.lst
+insert overwrite table hbpartition
+select distinct value
+from src
+where value='val_100' or value='val_200';
+
+dfs -count /tmp/hbpartitions;
+dfs -cp /tmp/hbpartitions/* /tmp/hbpartition.lst;
+
+set mapred.reduce.tasks=3;
+set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+set total.order.partitioner.natural.order=false;
+set total.order.partitioner.path=/tmp/hbpartition.lst;
+
+-- this should produce three files in /tmp/hbsort/cf
+insert overwrite table hbsort
+select distinct value, key, key+1
+from src
+cluster by value;
+
+dfs -count /tmp/hbsort/cf;
+
+-- To get the files out to your local filesystem for loading into
+-- HBase, run mkdir -p /tmp/blah/cf, then uncomment and
+-- semicolon-terminate the line below before running this test:
+-- dfs -copyToLocal /tmp/hbsort/cf/* /tmp/blah/cf
+
+drop table hbsort;
+drop table hbpartition;
Added: hadoop/hive/trunk/hbase-handler/src/test/results/hbase_bulk.m.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/results/hbase_bulk.m.out?rev=933997&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/results/hbase_bulk.m.out (added)
+++ hadoop/hive/trunk/hbase-handler/src/test/results/hbase_bulk.m.out Wed Apr 14 16:04:06 2010
@@ -0,0 +1,121 @@
+PREHOOK: query: drop table hbsort
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table hbsort
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table hbpartition
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table hbpartition
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: -- this is a dummy table used for controlling how the HFiles are
+-- created
+create table hbsort(key string, val string, val2 string)
+stored as
+INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
+TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf')
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: -- this is a dummy table used for controlling how the HFiles are
+-- created
+create table hbsort(key string, val string, val2 string)
+stored as
+INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
+TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@hbsort
+PREHOOK: query: -- this is a dummy table used for controlling how the input file
+-- for TotalOrderPartitioner is created
+create external table hbpartition(part_break string)
+row format serde
+'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
+stored as
+inputformat
+'org.apache.hadoop.mapred.TextInputFormat'
+outputformat
+'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
+location '/tmp/hbpartitions'
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: -- this is a dummy table used for controlling how the input file
+-- for TotalOrderPartitioner is created
+create external table hbpartition(part_break string)
+row format serde
+'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
+stored as
+inputformat
+'org.apache.hadoop.mapred.TextInputFormat'
+outputformat
+'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
+location '/tmp/hbpartitions'
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@hbpartition
+PREHOOK: query: -- this should produce one file in /tmp/hbpartitions, but we do not
+-- know what it will be called, so we will copy it to a well known
+-- filename /tmp/hbpartition.lst
+insert overwrite table hbpartition
+select distinct value
+from src
+where value='val_100' or value='val_200'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@hbpartition
+POSTHOOK: query: -- this should produce one file in /tmp/hbpartitions, but we do not
+-- know what it will be called, so we will copy it to a well known
+-- filename /tmp/hbpartition.lst
+insert overwrite table hbpartition
+select distinct value
+from src
+where value='val_100' or value='val_200'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@hbpartition
+POSTHOOK: Lineage: hbpartition.part_break SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+ 1 1 139 hdfs://localhost:59616/tmp/hbpartitions
+PREHOOK: query: -- this should produce three files in /tmp/hbsort/cf
+insert overwrite table hbsort
+select distinct value, key, key+1
+from src
+cluster by value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@hbsort
+POSTHOOK: query: -- this should produce three files in /tmp/hbsort/cf
+insert overwrite table hbsort
+select distinct value, key, key+1
+from src
+cluster by value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@hbsort
+POSTHOOK: Lineage: hbpartition.part_break SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.val SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.val2 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+ 1 3 23267 hdfs://localhost:59616/tmp/hbsort/cf
+PREHOOK: query: -- To get the files out to your local filesystem for loading into
+-- HBase, run mkdir -p /tmp/blah/cf, then uncomment and
+-- semicolon-terminate the line below before running this test:
+-- dfs -copyToLocal /tmp/hbsort/cf/* /tmp/blah/cf
+
+drop table hbsort
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- To get the files out to your local filesystem for loading into
+-- HBase, run mkdir -p /tmp/blah/cf, then uncomment and
+-- semicolon-terminate the line below before running this test:
+-- dfs -copyToLocal /tmp/hbsort/cf/* /tmp/blah/cf
+
+drop table hbsort
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@hbsort
+POSTHOOK: Lineage: hbpartition.part_break SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.val SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.val2 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: drop table hbpartition
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table hbpartition
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@hbpartition
+POSTHOOK: Lineage: hbpartition.part_break SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.val SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.val2 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
Modified: hadoop/hive/trunk/ql/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/build.xml?rev=933997&r1=933996&r2=933997&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/build.xml (original)
+++ hadoop/hive/trunk/ql/build.xml Wed Apr 14 16:04:06 2010
@@ -48,6 +48,7 @@
<pathelement location="${common.jar}"/>
<fileset dir="${hive.root}" includes="testlibs/*.jar"/>
<fileset dir="${hadoop.root}/lib" includes="*.jar"/>
+ <fileset dir="${hadoop.root}/lib/jsp-2.1" includes="*.jar"/>
<path refid="classpath"/>
</path>
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java?rev=933997&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java Wed Apr 14 16:04:06 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A {@link HiveOutputFormat} that writes {@link SequenceFile}s with the
+ * content saved in the keys, and null in the values.
+ */
+public class HiveNullValueSequenceFileOutputFormat
+ extends SequenceFileOutputFormat
+ implements HiveOutputFormat<WritableComparable, Writable> {
+
+ private static final Writable NULL_WRITABLE = NullWritable.get();
+
+ private HiveKey keyWritable;
+ private boolean keyIsText;
+
+ @Override
+ public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ Class<? extends Writable> valueClass, boolean isCompressed,
+ Properties tableProperties, Progressable progress) throws IOException {
+
+ FileSystem fs = finalOutPath.getFileSystem(jc);
+ final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
+ fs, finalOutPath, HiveKey.class, NullWritable.class, isCompressed);
+
+ keyWritable = new HiveKey();
+ keyIsText = valueClass.equals(Text.class);
+ return new RecordWriter() {
+ public void write(Writable r) throws IOException {
+ if (keyIsText) {
+ Text text = (Text) r;
+ keyWritable.set(text.getBytes(), 0, text.getLength());
+ } else {
+ BytesWritable bw = (BytesWritable) r;
+ keyWritable.set(bw.getBytes(), 0, bw.getLength());
+ }
+ keyWritable.setHashCode(r.hashCode());
+ outStream.append(keyWritable, NULL_WRITABLE);
+ }
+
+ public void close(boolean abort) throws IOException {
+ outStream.close();
+ }
+ };
+ }
+
+}