You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2014/06/09 19:01:59 UTC

svn commit: r1601441 - in /hive/trunk/hbase-handler/src: java/org/apache/hadoop/hive/hbase/ test/queries/negative/ test/queries/positive/ test/results/negative/ test/results/positive/

Author: khorgath
Date: Mon Jun  9 17:01:59 2014
New Revision: 1601441

URL: http://svn.apache.org/r1601441
Log:
HIVE-6473 : Allow writing HFiles via HBaseStorageHandler table (Nick Dimiduk via Sushanth Sowmyan)

Added:
    hive/trunk/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q
    hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q
    hive/trunk/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out
    hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out
Modified:
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1601441&r1=1601440&r2=1601441&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Mon Jun  9 17:01:59 2014
@@ -262,7 +262,10 @@ public class HBaseStorageHandler extends
 
   @Override
   public Class<? extends OutputFormat> getOutputFormatClass() {
-    return org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat.class;
+    if (isHBaseGenerateHFiles(jobConf)) {
+      return HiveHFileOutputFormat.class;
+    }
+    return HiveHBaseTableOutputFormat.class;
   }
 
   @Override
@@ -349,11 +352,31 @@ public class HBaseStorageHandler extends
       } //input job properties
     }
     else {
-      jobProperties.put(TableOutputFormat.OUTPUT_TABLE, tableName);
+      if (isHBaseGenerateHFiles(jobConf)) {
+        // only support bulkload when a hfile.family.path has been specified.
+        // TODO: support detecting cf's from column mapping
+        // TODO: support loading into multiple CF's at a time
+        String path = HiveHFileOutputFormat.getFamilyPath(jobConf, tableProperties);
+        if (path == null || path.isEmpty()) {
+          throw new RuntimeException("Please set " + HiveHFileOutputFormat.HFILE_FAMILY_PATH + " to target location for HFiles");
+        }
+        // TODO: should call HiveHFileOutputFormat#setOutputPath
+        jobProperties.put("mapred.output.dir", path);
+      } else {
+        jobProperties.put(TableOutputFormat.OUTPUT_TABLE, tableName);
+      }
     } // output job properties
   }
 
   /**
+   * Return true when HBaseStorageHandler should generate hfiles instead of operate against the
+   * online table. This mode is implicitly applied when "hive.hbase.completebulkload" is true.
+   */
+  public static boolean isHBaseGenerateHFiles(Configuration conf) {
+    return conf.getBoolean("hive.hbase.generatehfiles", false);
+  }
+
+  /**
    * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
    * if they are not already present in the jobConf.
    * @param jobConf Job configuration

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java?rev=1601441&r1=1601440&r2=1601441&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java Mon Jun  9 17:01:59 2014
@@ -19,6 +19,9 @@
 package org.apache.hadoop.hive.hbase;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.SortedMap;
@@ -27,10 +30,14 @@ import java.util.TreeMap;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -54,10 +61,9 @@ public class HiveHFileOutputFormat exten
     HFileOutputFormat implements
     HiveOutputFormat<ImmutableBytesWritable, KeyValue> {
 
-  private static final String HFILE_FAMILY_PATH = "hfile.family.path";
+  public static final String HFILE_FAMILY_PATH = "hfile.family.path";
 
-  static final Log LOG = LogFactory.getLog(
-    HiveHFileOutputFormat.class.getName());
+  static final Log LOG = LogFactory.getLog(HiveHFileOutputFormat.class.getName());
 
   private
   org.apache.hadoop.mapreduce.RecordWriter<ImmutableBytesWritable, KeyValue>
@@ -70,6 +76,14 @@ public class HiveHFileOutputFormat exten
     }
   }
 
+  /**
+   * Retrieve the family path, first check the JobConf, then the table properties.
+   * @return the family path or null if not specified.
+   */
+  public static String getFamilyPath(Configuration jc, Properties tableProps) {
+    return jc.get(HFILE_FAMILY_PATH, tableProps.getProperty(HFILE_FAMILY_PATH));
+  }
+
   @Override
   public RecordWriter getHiveRecordWriter(
     final JobConf jc,
@@ -79,8 +93,8 @@ public class HiveHFileOutputFormat exten
     Properties tableProperties,
     final Progressable progressable) throws IOException {
 
-    // Read configuration for the target path
-    String hfilePath = tableProperties.getProperty(HFILE_FAMILY_PATH);
+    // Read configuration for the target path, first from jobconf, then from table properties
+    String hfilePath = getFamilyPath(jc, tableProperties);
     if (hfilePath == null) {
       throw new RuntimeException(
         "Please set " + HFILE_FAMILY_PATH + " to target location for HFiles");
@@ -129,20 +143,18 @@ public class HiveHFileOutputFormat exten
           if (abort) {
             return;
           }
-          // Move the region file(s) from the task output directory
-          // to the location specified by the user.  There should
-          // actually only be one (each reducer produces one HFile),
-          // but we don't know what its name is.
+          // Move the hfiles file(s) from the task output directory to the
+          // location specified by the user.
           FileSystem fs = outputdir.getFileSystem(jc);
           fs.mkdirs(columnFamilyPath);
           Path srcDir = outputdir;
           for (;;) {
             FileStatus [] files = fs.listStatus(srcDir);
             if ((files == null) || (files.length == 0)) {
-              throw new IOException("No files found in " + srcDir);
+              throw new IOException("No family directories found in " + srcDir);
             }
             if (files.length != 1) {
-              throw new IOException("Multiple files found in " + srcDir);
+              throw new IOException("Multiple family directories found in " + srcDir);
             }
             srcDir = files[0].getPath();
             if (srcDir.getName().equals(columnFamilyName)) {
@@ -165,10 +177,9 @@ public class HiveHFileOutputFormat exten
         }
       }
 
-      @Override
-      public void write(Writable w) throws IOException {
+      private void writeText(Text text) throws IOException {
         // Decompose the incoming text row into fields.
-        String s = ((Text) w).toString();
+        String s = text.toString();
         String [] fields = s.split("\u0001");
         assert(fields.length <= (columnMap.size() + 1));
         // First field is the row key.
@@ -196,11 +207,40 @@ public class HiveHFileOutputFormat exten
             valBytes);
           try {
             fileWriter.write(null, kv);
+          } catch (IOException e) {
+            LOG.error("Failed while writing row: " + s);
+            throw e;
           } catch (InterruptedException ex) {
             throw new IOException(ex);
           }
         }
       }
+
+      private void writePut(PutWritable put) throws IOException {
+        ImmutableBytesWritable row = new ImmutableBytesWritable(put.getPut().getRow());
+        SortedMap<byte[], List<Cell>> cells = put.getPut().getFamilyCellMap();
+        for (Map.Entry<byte[], List<Cell>> entry : cells.entrySet()) {
+          Collections.sort(entry.getValue(), new CellComparator());
+          for (Cell c : entry.getValue()) {
+            try {
+              fileWriter.write(row, KeyValueUtil.copyToNewKeyValue(c));
+            } catch (InterruptedException e) {
+              throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+            }
+          }
+        }
+      }
+
+      @Override
+      public void write(Writable w) throws IOException {
+        if (w instanceof Text) {
+          writeText((Text) w);
+        } else if (w instanceof PutWritable) {
+          writePut((PutWritable) w);
+        } else {
+          throw new IOException("Unexpected writable " + w);
+        }
+      }
     };
   }
 

Added: hive/trunk/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q?rev=1601441&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q (added)
+++ hive/trunk/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q Mon Jun  9 17:01:59 2014
@@ -0,0 +1,10 @@
+-- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk;
+
+CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string');
+
+SET hive.hbase.generatehfiles = true;
+INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key;

Added: hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q?rev=1601441&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q (added)
+++ hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q Mon Jun  9 17:01:59 2014
@@ -0,0 +1,23 @@
+-- -*- mode:sql -*-
+
+drop table if exists hb_target;
+
+-- this is the target HBase table
+create table hb_target(key int, val string)
+stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+with serdeproperties ('hbase.columns.mapping' = ':key,cf:val')
+tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk');
+
+set hive.hbase.generatehfiles=true;
+set hfile.family.path=/tmp/hb_target/cf;
+
+-- this should produce three files in /tmp/hb_target/cf
+insert overwrite table hb_target select distinct key, value from src cluster by key;
+
+-- To get the files out to your local filesystem for loading into
+-- HBase, run mkdir -p /tmp/blah/cf, then uncomment and
+-- semicolon-terminate the line below before running this test:
+-- dfs -copyToLocal /tmp/hb_target/cf/* /tmp/blah/cf
+
+drop table hb_target;
+dfs -rmr /tmp/hb_target/cf;

Added: hive/trunk/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out?rev=1601441&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out (added)
+++ hive/trunk/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out Mon Jun  9 17:01:59 2014
@@ -0,0 +1,20 @@
+PREHOOK: query: -- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@hbase_bulk
+FAILED: RuntimeException Please set hfile.family.path to target location for HFiles

Added: hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out?rev=1601441&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out (added)
+++ hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out Mon Jun  9 17:01:59 2014
@@ -0,0 +1,52 @@
+PREHOOK: query: -- -*- mode:sql -*-
+
+drop table if exists hb_target
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- -*- mode:sql -*-
+
+drop table if exists hb_target
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: -- this is the target HBase table
+create table hb_target(key int, val string)
+stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+with serdeproperties ('hbase.columns.mapping' = ':key,cf:val')
+tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: -- this is the target HBase table
+create table hb_target(key int, val string)
+stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+with serdeproperties ('hbase.columns.mapping' = ':key,cf:val')
+tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@hb_target
+#### A masked pattern was here ####
+insert overwrite table hb_target select distinct key, value from src cluster by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@hb_target
+#### A masked pattern was here ####
+insert overwrite table hb_target select distinct key, value from src cluster by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@hb_target
+PREHOOK: query: -- To get the files out to your local filesystem for loading into
+#### A masked pattern was here ####
+-- semicolon-terminate the line below before running this test:
+#### A masked pattern was here ####
+
+drop table hb_target
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@hb_target
+PREHOOK: Output: default@hb_target
+POSTHOOK: query: -- To get the files out to your local filesystem for loading into
+#### A masked pattern was here ####
+-- semicolon-terminate the line below before running this test:
+#### A masked pattern was here ####
+
+drop table hb_target
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@hb_target
+POSTHOOK: Output: default@hb_target
+#### A masked pattern was here ####