You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2014/06/09 19:01:59 UTC
svn commit: r1601441 - in /hive/trunk/hbase-handler/src:
java/org/apache/hadoop/hive/hbase/ test/queries/negative/
test/queries/positive/ test/results/negative/ test/results/positive/
Author: khorgath
Date: Mon Jun 9 17:01:59 2014
New Revision: 1601441
URL: http://svn.apache.org/r1601441
Log:
HIVE-6473 : Allow writing HFiles via HBaseStorageHandler table (Nick Dimiduk via Sushanth Sowmyan)
Added:
hive/trunk/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q
hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q
hive/trunk/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out
hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1601441&r1=1601440&r2=1601441&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Mon Jun 9 17:01:59 2014
@@ -262,7 +262,10 @@ public class HBaseStorageHandler extends
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
- return org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat.class;
+ if (isHBaseGenerateHFiles(jobConf)) {
+ return HiveHFileOutputFormat.class;
+ }
+ return HiveHBaseTableOutputFormat.class;
}
@Override
@@ -349,11 +352,31 @@ public class HBaseStorageHandler extends
} //input job properties
}
else {
- jobProperties.put(TableOutputFormat.OUTPUT_TABLE, tableName);
+ if (isHBaseGenerateHFiles(jobConf)) {
+ // only support bulkload when a hfile.family.path has been specified.
+ // TODO: support detecting cf's from column mapping
+ // TODO: support loading into multiple CF's at a time
+ String path = HiveHFileOutputFormat.getFamilyPath(jobConf, tableProperties);
+ if (path == null || path.isEmpty()) {
+ throw new RuntimeException("Please set " + HiveHFileOutputFormat.HFILE_FAMILY_PATH + " to target location for HFiles");
+ }
+ // TODO: should call HiveHFileOutputFormat#setOutputPath
+ jobProperties.put("mapred.output.dir", path);
+ } else {
+ jobProperties.put(TableOutputFormat.OUTPUT_TABLE, tableName);
+ }
} // output job properties
}
/**
+ * Return true when HBaseStorageHandler should generate hfiles instead of operate against the
+ * online table. This mode is implicitly applied when "hive.hbase.completebulkload" is true.
+ */
+ public static boolean isHBaseGenerateHFiles(Configuration conf) {
+ return conf.getBoolean("hive.hbase.generatehfiles", false);
+ }
+
+ /**
* Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
* if they are not already present in the jobConf.
* @param jobConf Job configuration
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java?rev=1601441&r1=1601440&r2=1601441&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java Mon Jun 9 17:01:59 2014
@@ -19,6 +19,9 @@
package org.apache.hadoop.hive.hbase;
import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.SortedMap;
@@ -27,10 +30,14 @@ import java.util.TreeMap;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
@@ -54,10 +61,9 @@ public class HiveHFileOutputFormat exten
HFileOutputFormat implements
HiveOutputFormat<ImmutableBytesWritable, KeyValue> {
- private static final String HFILE_FAMILY_PATH = "hfile.family.path";
+ public static final String HFILE_FAMILY_PATH = "hfile.family.path";
- static final Log LOG = LogFactory.getLog(
- HiveHFileOutputFormat.class.getName());
+ static final Log LOG = LogFactory.getLog(HiveHFileOutputFormat.class.getName());
private
org.apache.hadoop.mapreduce.RecordWriter<ImmutableBytesWritable, KeyValue>
@@ -70,6 +76,14 @@ public class HiveHFileOutputFormat exten
}
}
+ /**
+ * Retrieve the family path, first check the JobConf, then the table properties.
+ * @return the family path or null if not specified.
+ */
+ public static String getFamilyPath(Configuration jc, Properties tableProps) {
+ return jc.get(HFILE_FAMILY_PATH, tableProps.getProperty(HFILE_FAMILY_PATH));
+ }
+
@Override
public RecordWriter getHiveRecordWriter(
final JobConf jc,
@@ -79,8 +93,8 @@ public class HiveHFileOutputFormat exten
Properties tableProperties,
final Progressable progressable) throws IOException {
- // Read configuration for the target path
- String hfilePath = tableProperties.getProperty(HFILE_FAMILY_PATH);
+ // Read configuration for the target path, first from jobconf, then from table properties
+ String hfilePath = getFamilyPath(jc, tableProperties);
if (hfilePath == null) {
throw new RuntimeException(
"Please set " + HFILE_FAMILY_PATH + " to target location for HFiles");
@@ -129,20 +143,18 @@ public class HiveHFileOutputFormat exten
if (abort) {
return;
}
- // Move the region file(s) from the task output directory
- // to the location specified by the user. There should
- // actually only be one (each reducer produces one HFile),
- // but we don't know what its name is.
+ // Move the hfiles file(s) from the task output directory to the
+ // location specified by the user.
FileSystem fs = outputdir.getFileSystem(jc);
fs.mkdirs(columnFamilyPath);
Path srcDir = outputdir;
for (;;) {
FileStatus [] files = fs.listStatus(srcDir);
if ((files == null) || (files.length == 0)) {
- throw new IOException("No files found in " + srcDir);
+ throw new IOException("No family directories found in " + srcDir);
}
if (files.length != 1) {
- throw new IOException("Multiple files found in " + srcDir);
+ throw new IOException("Multiple family directories found in " + srcDir);
}
srcDir = files[0].getPath();
if (srcDir.getName().equals(columnFamilyName)) {
@@ -165,10 +177,9 @@ public class HiveHFileOutputFormat exten
}
}
- @Override
- public void write(Writable w) throws IOException {
+ private void writeText(Text text) throws IOException {
// Decompose the incoming text row into fields.
- String s = ((Text) w).toString();
+ String s = text.toString();
String [] fields = s.split("\u0001");
assert(fields.length <= (columnMap.size() + 1));
// First field is the row key.
@@ -196,11 +207,40 @@ public class HiveHFileOutputFormat exten
valBytes);
try {
fileWriter.write(null, kv);
+ } catch (IOException e) {
+ LOG.error("Failed while writing row: " + s);
+ throw e;
} catch (InterruptedException ex) {
throw new IOException(ex);
}
}
}
+
+ private void writePut(PutWritable put) throws IOException {
+ ImmutableBytesWritable row = new ImmutableBytesWritable(put.getPut().getRow());
+ SortedMap<byte[], List<Cell>> cells = put.getPut().getFamilyCellMap();
+ for (Map.Entry<byte[], List<Cell>> entry : cells.entrySet()) {
+ Collections.sort(entry.getValue(), new CellComparator());
+ for (Cell c : entry.getValue()) {
+ try {
+ fileWriter.write(row, KeyValueUtil.copyToNewKeyValue(c));
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void write(Writable w) throws IOException {
+ if (w instanceof Text) {
+ writeText((Text) w);
+ } else if (w instanceof PutWritable) {
+ writePut((PutWritable) w);
+ } else {
+ throw new IOException("Unexpected writable " + w);
+ }
+ }
};
}
Added: hive/trunk/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q?rev=1601441&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q (added)
+++ hive/trunk/hbase-handler/src/test/queries/negative/generatehfiles_require_family_path.q Mon Jun 9 17:01:59 2014
@@ -0,0 +1,10 @@
+-- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk;
+
+CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string');
+
+SET hive.hbase.generatehfiles = true;
+INSERT OVERWRITE TABLE hbase_bulk SELECT * FROM src CLUSTER BY key;
Added: hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q?rev=1601441&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q (added)
+++ hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q Mon Jun 9 17:01:59 2014
@@ -0,0 +1,23 @@
+-- -*- mode:sql -*-
+
+drop table if exists hb_target;
+
+-- this is the target HBase table
+create table hb_target(key int, val string)
+stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+with serdeproperties ('hbase.columns.mapping' = ':key,cf:val')
+tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk');
+
+set hive.hbase.generatehfiles=true;
+set hfile.family.path=/tmp/hb_target/cf;
+
+-- this should produce three files in /tmp/hb_target/cf
+insert overwrite table hb_target select distinct key, value from src cluster by key;
+
+-- To get the files out to your local filesystem for loading into
+-- HBase, run mkdir -p /tmp/blah/cf, then uncomment and
+-- semicolon-terminate the line below before running this test:
+-- dfs -copyToLocal /tmp/hb_target/cf/* /tmp/blah/cf
+
+drop table hb_target;
+dfs -rmr /tmp/hb_target/cf;
Added: hive/trunk/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out?rev=1601441&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out (added)
+++ hive/trunk/hbase-handler/src/test/results/negative/generatehfiles_require_family_path.q.out Mon Jun 9 17:01:59 2014
@@ -0,0 +1,20 @@
+PREHOOK: query: -- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- -*- mode:sql -*-
+
+DROP TABLE IF EXISTS hbase_bulk
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: CREATE TABLE hbase_bulk (key INT, value STRING)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:string')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@hbase_bulk
+FAILED: RuntimeException Please set hfile.family.path to target location for HFiles
Added: hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out?rev=1601441&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out (added)
+++ hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out Mon Jun 9 17:01:59 2014
@@ -0,0 +1,52 @@
+PREHOOK: query: -- -*- mode:sql -*-
+
+drop table if exists hb_target
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- -*- mode:sql -*-
+
+drop table if exists hb_target
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: -- this is the target HBase table
+create table hb_target(key int, val string)
+stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+with serdeproperties ('hbase.columns.mapping' = ':key,cf:val')
+tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: -- this is the target HBase table
+create table hb_target(key int, val string)
+stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+with serdeproperties ('hbase.columns.mapping' = ':key,cf:val')
+tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@hb_target
+#### A masked pattern was here ####
+insert overwrite table hb_target select distinct key, value from src cluster by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@hb_target
+#### A masked pattern was here ####
+insert overwrite table hb_target select distinct key, value from src cluster by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@hb_target
+PREHOOK: query: -- To get the files out to your local filesystem for loading into
+#### A masked pattern was here ####
+-- semicolon-terminate the line below before running this test:
+#### A masked pattern was here ####
+
+drop table hb_target
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@hb_target
+PREHOOK: Output: default@hb_target
+POSTHOOK: query: -- To get the files out to your local filesystem for loading into
+#### A masked pattern was here ####
+-- semicolon-terminate the line below before running this test:
+#### A masked pattern was here ####
+
+drop table hb_target
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@hb_target
+POSTHOOK: Output: default@hb_target
+#### A masked pattern was here ####