You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/04/14 18:04:07 UTC

svn commit: r933997 - in /hadoop/hive/trunk: ./ hbase-handler/ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hbase-handler/src/test/queries/ hbase-handler/src/test/results/ ql/ ql/src/java/org/apache/hadoop/hive/ql/io/

Author: namit
Date: Wed Apr 14 16:04:06 2010
New Revision: 933997

URL: http://svn.apache.org/viewvc?rev=933997&view=rev
Log:
HIVE-1295. facilitate HBase bulk loads from Hive
(John Sichi via namit)


Added:
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
    hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_bulk.m
    hadoop/hive/trunk/hbase-handler/src/test/results/hbase_bulk.m.out
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/build-common.xml
    hadoop/hive/trunk/hbase-handler/build.xml
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
    hadoop/hive/trunk/ql/build.xml

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=933997&r1=933996&r2=933997&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Apr 14 16:04:06 2010
@@ -72,6 +72,9 @@ Trunk -  Unreleased
     HIVE-1300. alter table touch partition
     (Paul Yang via namit)
 
+    HIVE-1295. facilitate HBase bulk loads from Hive
+    (John Sichi via namit)
+
   IMPROVEMENTS
     HIVE-983. Function from_unixtime takes long.
     (Ning Zhang via zshao)

Modified: hadoop/hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/build-common.xml?rev=933997&r1=933996&r2=933997&view=diff
==============================================================================
--- hadoop/hive/trunk/build-common.xml (original)
+++ hadoop/hive/trunk/build-common.xml Wed Apr 14 16:04:06 2010
@@ -218,8 +218,8 @@
     <pathelement location="${build.dir.hive}/cli/classes"/>
     <pathelement location="${build.dir.hive}/shims/classes"/>
     <pathelement location="${build.dir.hive}/hwi/classes"/>
-    <fileset dir="${basedir}" includes="lib/*.jar"/>
     <path refid="common-classpath"/>
+    <fileset dir="${basedir}" includes="lib/*.jar"/>
   </path>
 
   <target name="create-dirs">

Modified: hadoop/hive/trunk/hbase-handler/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/build.xml?rev=933997&r1=933996&r2=933997&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/build.xml (original)
+++ hadoop/hive/trunk/hbase-handler/build.xml Wed Apr 14 16:04:06 2010
@@ -74,7 +74,14 @@
               resultsDirectory="${hbase-handler.test.results.dir}" className="TestHBaseCliDriver"
               logFile="${test.log.dir}/testhbaseclidrivergen.log"
               logDirectory="${test.log.dir}/hbase-handler"/>
-
+    <qtestgen outputDirectory="${test.build.src}/org/apache/hadoop/hive/cli" 
+              templatePath="${ql.hbase.test.template.dir}" template="TestHBaseCliDriver.vm" 
+              queryDirectory="${hbase-handler.test.query.dir}" 
+              queryFile="hbase_bulk.m"
+              clusterMode="miniMR"
+              resultsDirectory="${hbase-handler.test.results.dir}" className="TestHBaseMinimrCliDriver"
+              logFile="${test.log.dir}/testhbaseminimrclidrivergen.log"
+              logDirectory="${test.log.dir}/hbase-handler"/>
   </target>
 
   <!-- override target jar because tests need to add hbase-handler jars,

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java?rev=933997&r1=933996&r2=933997&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java Wed Apr 14 16:04:06 2010
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.Progressable;
 
 /**
- * HiveHBaseTableOutputFormat implements TableOutputFormat for HBase tables.
+ * HiveHBaseTableOutputFormat implements HiveOutputFormat for HBase tables.
  */
 public class HiveHBaseTableOutputFormat extends 
     TableOutputFormat implements

Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java?rev=933997&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java (added)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java Wed Apr 14 16:04:06 2010
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * HiveHFileOutputFormat implements HiveOutputFormat for HFile bulk
+ * loading.  Until HBASE-1861 is implemented, it can only be used
+ * for loading a table with a single column family.
+ */
+public class HiveHFileOutputFormat extends 
+    HFileOutputFormat implements
+    HiveOutputFormat<ImmutableBytesWritable, KeyValue> {
+
+  private static final String HFILE_FAMILY_PATH = "hfile.family.path";
+
+  private static final Log LOG = LogFactory.getLog(
+    HiveHFileOutputFormat.class.getName());
+
+  private org.apache.hadoop.mapreduce.RecordWriter<
+    ImmutableBytesWritable, KeyValue> getFileWriter(
+      org.apache.hadoop.mapreduce.TaskAttemptContext tac) throws IOException
+  {
+    try {
+      return super.getRecordWriter(tac);
+    } catch (InterruptedException ex) {
+      throw new IOException(ex);
+    }
+  }
+  
+  @Override
+  public RecordWriter getHiveRecordWriter(
+    final JobConf jc, final Path finalOutPath,
+    Class<? extends Writable> valueClass, boolean isCompressed,
+    Properties tableProperties,
+    final Progressable progressable) throws IOException {
+
+    // Read configuration for the target path
+    String hfilePath = tableProperties.getProperty(HFILE_FAMILY_PATH);
+    if (hfilePath == null) {
+      throw new RuntimeException(
+        "Please set " + HFILE_FAMILY_PATH + " to target location for HFiles");
+    }
+
+    // Target path's last component is also the column family name.
+    final Path columnFamilyPath = new Path(hfilePath);
+    final String columnFamilyName = columnFamilyPath.getName();
+    final byte [] columnFamilyNameBytes = Bytes.toBytes(columnFamilyName);
+    final Job job = new Job(jc);
+    setCompressOutput(job, isCompressed);
+    setOutputPath(job, finalOutPath);
+
+    // Create the HFile writer
+    org.apache.hadoop.mapreduce.TaskAttemptContext tac =
+      new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()) {
+        @Override
+        public void progress() {
+          progressable.progress();
+        }
+      };
+    final org.apache.hadoop.mapreduce.RecordWriter<
+      ImmutableBytesWritable, KeyValue> fileWriter = getFileWriter(tac);
+
+    // Individual columns are going to be pivoted to HBase cells,
+    // and for each row, they need to be written out in order
+    // of column name, so sort the column names now, creating a
+    // mapping to their column position.  However, the first
+    // column is interpreted as the row key.
+    String columnList = tableProperties.getProperty("columns");
+    String [] columnArray = columnList.split(",");
+    final SortedMap<byte [], Integer> columnMap =
+      new TreeMap<byte [], Integer>(Bytes.BYTES_COMPARATOR);
+    int i = 0;
+    for (String columnName : columnArray) {
+      if (i != 0) {
+        columnMap.put(Bytes.toBytes(columnName), i);
+      }
+      ++i;
+    }
+
+    return new RecordWriter() {
+      
+      @Override
+      public void close(boolean abort) throws IOException {
+        try {
+          fileWriter.close(null);
+          if (abort) {
+            return;
+          }
+          // Move the region file(s) from the task output directory
+          // to the location specified by the user.  There should
+          // actually only be one (each reducer produces one HFile),
+          // but we don't know what its name is.
+          Path outputdir = FileOutputFormat.getOutputPath(job);
+          FileSystem fs = outputdir.getFileSystem(jc);
+          fs.mkdirs(columnFamilyPath);
+          Path srcDir = new Path(outputdir, columnFamilyName);
+          for (FileStatus regionFile : fs.listStatus(srcDir)) {
+            fs.rename(
+              regionFile.getPath(),
+              new Path(
+                columnFamilyPath,
+                regionFile.getPath().getName()));
+          }
+          // Hive actually wants a file as task output (not a directory), so
+          // replace the empty directory with an empty file to keep it happy.
+          fs.delete(outputdir, true);
+          fs.createNewFile(outputdir);
+        } catch (InterruptedException ex) {
+          throw new IOException(ex);
+        }
+      }
+
+      @Override
+      public void write(Writable w) throws IOException {
+        // Decompose the incoming text row into fields.
+        String s = ((Text) w).toString();
+        String [] fields = s.split("\u0001");
+        assert(fields.length == (columnMap.size() + 1));
+        // First field is the row key.
+        byte [] rowKeyBytes = Bytes.toBytes(fields[0]);
+        // Remaining fields are cells addressed by column name within row.
+        for (Map.Entry<byte [], Integer> entry : columnMap.entrySet()) {
+          byte [] columnNameBytes = entry.getKey();
+          int iColumn = entry.getValue();
+          byte [] valBytes = Bytes.toBytes(fields[iColumn]);
+          KeyValue kv = new KeyValue(
+            rowKeyBytes,
+            columnFamilyNameBytes,
+            columnNameBytes,
+            valBytes);
+          try {
+            fileWriter.write(null, kv);
+          } catch (InterruptedException ex) {
+            throw new IOException(ex);
+          }
+        }
+      }
+    };
+  }
+
+}

Added: hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_bulk.m
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_bulk.m?rev=933997&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_bulk.m (added)
+++ hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_bulk.m Wed Apr 14 16:04:06 2010
@@ -0,0 +1,54 @@
+drop table hbsort;
+drop table hbpartition;
+
+-- this is a dummy table used for controlling how the HFiles are
+-- created
+create table hbsort(key string, val string, val2 string)
+stored as
+INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
+TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf');
+
+-- this is a dummy table used for controlling how the input file
+-- for TotalOrderPartitioner is created
+create external table hbpartition(part_break string)
+row format serde 
+'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
+stored as 
+inputformat 
+'org.apache.hadoop.mapred.TextInputFormat'
+outputformat 
+'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
+location '/tmp/hbpartitions';
+
+-- this should produce one file in /tmp/hbpartitions, but we do not
+-- know what it will be called, so we will copy it to a well known
+-- filename /tmp/hbpartition.lst
+insert overwrite table hbpartition
+select distinct value
+from src
+where value='val_100' or value='val_200';
+
+dfs -count /tmp/hbpartitions;
+dfs -cp /tmp/hbpartitions/* /tmp/hbpartition.lst;
+
+set mapred.reduce.tasks=3;
+set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+set total.order.partitioner.natural.order=false;
+set total.order.partitioner.path=/tmp/hbpartition.lst;
+
+-- this should produce three files in /tmp/hbsort/cf
+insert overwrite table hbsort
+select distinct value, key, key+1
+from src
+cluster by value;
+
+dfs -count /tmp/hbsort/cf;
+
+-- To get the files out to your local filesystem for loading into
+-- HBase, run mkdir -p /tmp/blah/cf, then uncomment and
+-- semicolon-terminate the line below before running this test:
+-- dfs -copyToLocal /tmp/hbsort/cf/* /tmp/blah/cf
+
+drop table hbsort;
+drop table hbpartition;

Added: hadoop/hive/trunk/hbase-handler/src/test/results/hbase_bulk.m.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/results/hbase_bulk.m.out?rev=933997&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/results/hbase_bulk.m.out (added)
+++ hadoop/hive/trunk/hbase-handler/src/test/results/hbase_bulk.m.out Wed Apr 14 16:04:06 2010
@@ -0,0 +1,121 @@
+PREHOOK: query: drop table hbsort
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table hbsort
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table hbpartition
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table hbpartition
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: -- this is a dummy table used for controlling how the HFiles are
+-- created
+create table hbsort(key string, val string, val2 string)
+stored as
+INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
+TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf')
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: -- this is a dummy table used for controlling how the HFiles are
+-- created
+create table hbsort(key string, val string, val2 string)
+stored as
+INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
+TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@hbsort
+PREHOOK: query: -- this is a dummy table used for controlling how the input file
+-- for TotalOrderPartitioner is created
+create external table hbpartition(part_break string)
+row format serde 
+'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
+stored as 
+inputformat 
+'org.apache.hadoop.mapred.TextInputFormat'
+outputformat 
+'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
+location '/tmp/hbpartitions'
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: -- this is a dummy table used for controlling how the input file
+-- for TotalOrderPartitioner is created
+create external table hbpartition(part_break string)
+row format serde 
+'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
+stored as 
+inputformat 
+'org.apache.hadoop.mapred.TextInputFormat'
+outputformat 
+'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
+location '/tmp/hbpartitions'
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@hbpartition
+PREHOOK: query: -- this should produce one file in /tmp/hbpartitions, but we do not
+-- know what it will be called, so we will copy it to a well known
+-- filename /tmp/hbpartition.lst
+insert overwrite table hbpartition
+select distinct value
+from src
+where value='val_100' or value='val_200'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@hbpartition
+POSTHOOK: query: -- this should produce one file in /tmp/hbpartitions, but we do not
+-- know what it will be called, so we will copy it to a well known
+-- filename /tmp/hbpartition.lst
+insert overwrite table hbpartition
+select distinct value
+from src
+where value='val_100' or value='val_200'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@hbpartition
+POSTHOOK: Lineage: hbpartition.part_break SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+           1            1                139 hdfs://localhost:59616/tmp/hbpartitions
+PREHOOK: query: -- this should produce three files in /tmp/hbsort/cf
+insert overwrite table hbsort
+select distinct value, key, key+1
+from src
+cluster by value
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@hbsort
+POSTHOOK: query: -- this should produce three files in /tmp/hbsort/cf
+insert overwrite table hbsort
+select distinct value, key, key+1
+from src
+cluster by value
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@hbsort
+POSTHOOK: Lineage: hbpartition.part_break SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.val SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.val2 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+           1            3              23267 hdfs://localhost:59616/tmp/hbsort/cf
+PREHOOK: query: -- To get the files out to your local filesystem for loading into
+-- HBase, run mkdir -p /tmp/blah/cf, then uncomment and
+-- semicolon-terminate the line below before running this test:
+-- dfs -copyToLocal /tmp/hbsort/cf/* /tmp/blah/cf
+
+drop table hbsort
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- To get the files out to your local filesystem for loading into
+-- HBase, run mkdir -p /tmp/blah/cf, then uncomment and
+-- semicolon-terminate the line below before running this test:
+-- dfs -copyToLocal /tmp/hbsort/cf/* /tmp/blah/cf
+
+drop table hbsort
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@hbsort
+POSTHOOK: Lineage: hbpartition.part_break SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.val SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.val2 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: drop table hbpartition
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table hbpartition
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@hbpartition
+POSTHOOK: Lineage: hbpartition.part_break SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.val SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: hbsort.val2 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]

Modified: hadoop/hive/trunk/ql/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/build.xml?rev=933997&r1=933996&r2=933997&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/build.xml (original)
+++ hadoop/hive/trunk/ql/build.xml Wed Apr 14 16:04:06 2010
@@ -48,6 +48,7 @@
     <pathelement location="${common.jar}"/>
     <fileset dir="${hive.root}" includes="testlibs/*.jar"/>
     <fileset dir="${hadoop.root}/lib" includes="*.jar"/>
+    <fileset dir="${hadoop.root}/lib/jsp-2.1" includes="*.jar"/>
     <path refid="classpath"/>
   </path>
 

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java?rev=933997&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java Wed Apr 14 16:04:06 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A {@link HiveOutputFormat} that writes {@link SequenceFile}s with the
+ * content saved in the keys, and null in the values.
+ */
+public class HiveNullValueSequenceFileOutputFormat
+  extends SequenceFileOutputFormat
+  implements HiveOutputFormat<WritableComparable, Writable> {
+
+  private static final Writable NULL_WRITABLE = NullWritable.get();
+
+  private HiveKey keyWritable;
+  private boolean keyIsText;
+
+  @Override
+  public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+      Class<? extends Writable> valueClass, boolean isCompressed,
+      Properties tableProperties, Progressable progress) throws IOException {
+
+    FileSystem fs = finalOutPath.getFileSystem(jc);
+    final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
+        fs, finalOutPath, HiveKey.class, NullWritable.class, isCompressed);
+
+    keyWritable = new HiveKey();
+    keyIsText = valueClass.equals(Text.class);
+    return new RecordWriter() {
+      public void write(Writable r) throws IOException {
+        if (keyIsText) {
+          Text text = (Text) r;
+          keyWritable.set(text.getBytes(), 0, text.getLength());
+        } else {
+          BytesWritable bw = (BytesWritable) r;
+          keyWritable.set(bw.getBytes(), 0, bw.getLength());
+        }
+        keyWritable.setHashCode(r.hashCode());
+        outStream.append(keyWritable, NULL_WRITABLE);
+      }
+
+      public void close(boolean abort) throws IOException {
+        outStream.close();
+      }
+    };
+  }
+
+}