You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/10/31 22:15:29 UTC
svn commit: r1195674 - in /incubator/sqoop/trunk/src/java:
com/cloudera/sqoop/hbase/ com/cloudera/sqoop/hive/ com/cloudera/sqoop/io/
org/apache/sqoop/hbase/ org/apache/sqoop/hive/
Author: arvind
Date: Mon Oct 31 21:15:29 2011
New Revision: 1195674
URL: http://svn.apache.org/viewvc?rev=1195674&view=rev
Log:
SQOOP-380. Migrate hive and hbase packages to new namespace.
(Bilung Lee via Arvind Prabhakar)
Added:
incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/
incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBaseUtil.java
incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/PutTransformer.java
incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/
incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java
incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveTypes.java
incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java
Modified:
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBaseUtil.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/PutTransformer.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveTypes.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,116 +18,19 @@
package com.cloudera.sqoop.hbase;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.cloudera.sqoop.lib.FieldMappable;
-import com.cloudera.sqoop.lib.FieldMapProcessor;
-import com.cloudera.sqoop.lib.ProcessingException;
-
/**
- * SqoopRecordProcessor that performs an HBase "put" operation
- * that contains all the fields of the record.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class HBasePutProcessor implements Closeable, Configurable,
- FieldMapProcessor {
-
- /** Configuration key specifying the table to insert into. */
- public static final String TABLE_NAME_KEY = "sqoop.hbase.insert.table";
+public class HBasePutProcessor
+ extends org.apache.sqoop.hbase.HBasePutProcessor {
- /** Configuration key specifying the column family to insert into. */
+ public static final String TABLE_NAME_KEY =
+ org.apache.sqoop.hbase.HBasePutProcessor.TABLE_NAME_KEY;
public static final String COL_FAMILY_KEY =
- "sqoop.hbase.insert.column.family";
-
- /** Configuration key specifying the column of the input whose value
- * should be used as the row id.
- */
+ org.apache.sqoop.hbase.HBasePutProcessor.COL_FAMILY_KEY;
public static final String ROW_KEY_COLUMN_KEY =
- "sqoop.hbase.insert.row.key.column";
-
- /**
- * Configuration key specifying the PutTransformer implementation to use.
- */
+ org.apache.sqoop.hbase.HBasePutProcessor.ROW_KEY_COLUMN_KEY;
public static final String TRANSFORMER_CLASS_KEY =
- "sqoop.hbase.insert.put.transformer.class";
-
- private Configuration conf;
-
- // An object that can transform a map of fieldName->object
- // into a Put command.
- private PutTransformer putTransformer;
-
- private String tableName;
- private HTable table;
-
- public HBasePutProcessor() {
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void setConf(Configuration config) {
- this.conf = config;
-
- // Get the implementation of PutTransformer to use.
- // By default, we call toString() on every non-null field.
- Class<? extends PutTransformer> xformerClass =
- (Class<? extends PutTransformer>)
- this.conf.getClass(TRANSFORMER_CLASS_KEY, ToStringPutTransformer.class);
- this.putTransformer = (PutTransformer)
- ReflectionUtils.newInstance(xformerClass, this.conf);
- if (null == putTransformer) {
- throw new RuntimeException("Could not instantiate PutTransformer.");
- }
-
- this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
- this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
-
- this.tableName = conf.get(TABLE_NAME_KEY, null);
- try {
- this.table = new HTable(conf, this.tableName);
- } catch (IOException ioe) {
- throw new RuntimeException("Could not access HBase table " + tableName,
- ioe);
- }
- this.table.setAutoFlush(false);
- }
-
- @Override
- public Configuration getConf() {
- return this.conf;
- }
-
- @Override
- /**
- * Processes a record by extracting its field map and converting
- * it into a list of Put commands into HBase.
- */
- public void accept(FieldMappable record)
- throws IOException, ProcessingException {
- Map<String, Object> fields = record.getFieldMap();
-
- List<Put> putList = putTransformer.getPutCommand(fields);
- if (null != putList) {
- for (Put put : putList) {
- this.table.put(put);
- }
- }
- }
+ org.apache.sqoop.hbase.HBasePutProcessor.TRANSFORMER_CLASS_KEY;
- @Override
- /**
- * Closes the HBase table and commits all pending operations.
- */
- public void close() throws IOException {
- this.table.flushCommits();
- this.table.close();
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBaseUtil.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBaseUtil.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBaseUtil.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBaseUtil.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,33 +19,21 @@
package com.cloudera.sqoop.hbase;
/**
- * This class provides a method that checks if HBase jars are present in the
- * current classpath. It also provides a setAlwaysNoHBaseJarMode mechanism for
- * testing and simulation the condition where the is on HBase jar (since hbase
- * is pulled automatically by ivy)
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public final class HBaseUtil {
- private static boolean testingMode = false;
- private HBaseUtil() {
- }
+ private HBaseUtil() { }
/**
* This is a way to make this always return false for testing.
*/
public static void setAlwaysNoHBaseJarMode(boolean mode) {
- testingMode = mode;
+ org.apache.sqoop.hbase.HBaseUtil.setAlwaysNoHBaseJarMode(mode);
}
public static boolean isHBaseJarPresent() {
- if (testingMode) {
- return false;
- }
- try {
- Class.forName("org.apache.hadoop.hbase.client.HTable");
- } catch (ClassNotFoundException cnfe) {
- return false;
- }
- return true;
+ return org.apache.sqoop.hbase.HBaseUtil.isHBaseJarPresent();
}
+
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/PutTransformer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/PutTransformer.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/PutTransformer.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/PutTransformer.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,58 +18,9 @@
package com.cloudera.sqoop.hbase;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.client.Put;
-
/**
- * Interface that takes a map of jdbc field names to values
- * and converts them to a Put command for HBase.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public abstract class PutTransformer {
-
- public PutTransformer() {
- }
-
- private String columnFamily;
- private String rowKeyColumn;
-
- /**
- * @return the default column family to insert into.
- */
- public String getColumnFamily() {
- return this.columnFamily;
- }
-
- /**
- * Set the default column family to insert into.
- */
- public void setColumnFamily(String colFamily) {
- this.columnFamily = colFamily;
- }
-
- /**
- * @return the field name identifying the value to use as the row id.
- */
- public String getRowKeyColumn() {
- return this.rowKeyColumn;
- }
-
- /**
- * Set the column of the input fields which should be used to calculate
- * the row id.
- */
- public void setRowKeyColumn(String rowKeyCol) {
- this.rowKeyColumn = rowKeyCol;
- }
-
- /**
- * Returns a list of Put commands that inserts the fields into a row in HBase.
- * @param fields a map of field names to values to insert.
- * @return A list of Put commands that inserts these into HBase.
- */
- public abstract List<Put> getPutCommand(Map<String, Object> fields)
- throws IOException;
+public abstract class PutTransformer
+ extends org.apache.sqoop.hbase.PutTransformer {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -33,70 +31,8 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.util.Bytes;
/**
- * PutTransformer that calls toString on all non-null fields.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class ToStringPutTransformer extends PutTransformer {
-
- public static final Log LOG = LogFactory.getLog(
- ToStringPutTransformer.class.getName());
-
- // A mapping from field name -> bytes for that field name.
- // Used to cache serialization work done for fields names.
- private Map<String, byte[]> serializedFieldNames;
-
- public ToStringPutTransformer() {
- serializedFieldNames = new TreeMap<String, byte[]>();
- }
-
- /**
- * Return the serialized bytes for a field name, using
- * the cache if it's already in there.
- */
- private byte [] getFieldNameBytes(String fieldName) {
- byte [] cachedName = serializedFieldNames.get(fieldName);
- if (null != cachedName) {
- // Cache hit. We're done.
- return cachedName;
- }
-
- // Do the serialization and memoize the result.
- byte [] nameBytes = Bytes.toBytes(fieldName);
- serializedFieldNames.put(fieldName, nameBytes);
- return nameBytes;
- }
-
- @Override
- /** {@inheritDoc} */
- public List<Put> getPutCommand(Map<String, Object> fields)
- throws IOException {
-
- String rowKeyCol = getRowKeyColumn();
- String colFamily = getColumnFamily();
- byte [] colFamilyBytes = Bytes.toBytes(colFamily);
-
- Object rowKey = fields.get(rowKeyCol);
- if (null == rowKey) {
- // If the row-key column is null, we don't insert this row.
- LOG.warn("Could not insert row with null value for row-key column: "
- + rowKeyCol);
- return null;
- }
-
- Put put = new Put(Bytes.toBytes(rowKey.toString()));
-
- for (Map.Entry<String, Object> fieldEntry : fields.entrySet()) {
- String colName = fieldEntry.getKey();
- if (!colName.equals(rowKeyCol)) {
- // This is a regular field, not the row key.
- // Add it if it's not null.
- Object val = fieldEntry.getValue();
- if (null != val) {
- put.add(colFamilyBytes, getFieldNameBytes(colName),
- Bytes.toBytes(val.toString()));
- }
- }
- }
-
- return Collections.singletonList(put);
- }
+public class ToStringPutTransformer
+ extends org.apache.sqoop.hbase.ToStringPutTransformer {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,334 +18,20 @@
package com.cloudera.sqoop.hive;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.io.CodecMap;
import com.cloudera.sqoop.manager.ConnManager;
-import com.cloudera.sqoop.util.Executor;
-import com.cloudera.sqoop.util.ExitSecurityException;
-import com.cloudera.sqoop.util.LoggingAsyncSink;
-import com.cloudera.sqoop.util.SubprocessSecurityManager;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.util.Tool;
/**
- * Utility to import a table into the Hive metastore. Manages the connection
- * to Hive itself as well as orchestrating the use of the other classes in this
- * package.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class HiveImport {
-
- public static final Log LOG = LogFactory.getLog(HiveImport.class.getName());
-
- private SqoopOptions options;
- private ConnManager connManager;
- private Configuration configuration;
- private boolean generateOnly;
-
- /** Entry point through which Hive invocation should be attempted. */
- private static final String HIVE_MAIN_CLASS =
- "org.apache.hadoop.hive.cli.CliDriver";
+public class HiveImport
+ extends org.apache.sqoop.hive.HiveImport {
public HiveImport(final SqoopOptions opts, final ConnManager connMgr,
final Configuration conf, final boolean generateOnly) {
- this.options = opts;
- this.connManager = connMgr;
- this.configuration = conf;
- this.generateOnly = generateOnly;
- }
-
-
- /**
- * @return the filename of the hive executable to run to do the import
- */
- private String getHiveBinPath() {
- // If the user has $HIVE_HOME set, then use $HIVE_HOME/bin/hive if it
- // exists.
- // Fall back to just plain 'hive' and hope it's in the path.
-
- String hiveHome = options.getHiveHome();
- if (null == hiveHome) {
- return "hive";
- }
-
- Path p = new Path(hiveHome);
- p = new Path(p, "bin");
- p = new Path(p, "hive");
- String hiveBinStr = p.toString();
- if (new File(hiveBinStr).exists()) {
- return hiveBinStr;
- } else {
- return "hive";
- }
- }
-
- /**
- * If we used a MapReduce-based upload of the data, remove the _logs dir
- * from where we put it, before running Hive LOAD DATA INPATH.
- */
- private void removeTempLogs(String tableName) throws IOException {
- FileSystem fs = FileSystem.get(configuration);
- Path tablePath;
- if (null != tableName) {
- String warehouseDir = options.getWarehouseDir();
- if (warehouseDir != null) {
- tablePath = new Path(new Path(warehouseDir), tableName);
- } else {
- tablePath = new Path(tableName);
- }
- } else {
- // --table option is not used, so use the target dir instead
- tablePath = new Path(options.getTargetDir());
- }
-
- Path logsPath = new Path(tablePath, "_logs");
- if (fs.exists(logsPath)) {
- LOG.info("Removing temporary files from import process: " + logsPath);
- if (!fs.delete(logsPath, true)) {
- LOG.warn("Could not delete temporary files; "
- + "continuing with import, but it may fail.");
- }
- }
- }
-
- /**
- * @return true if we're just generating the DDL for the import, but
- * not actually running it (i.e., --generate-only mode). If so, don't
- * do any side-effecting actions in Hive.
- */
- private boolean isGenerateOnly() {
- return generateOnly;
- }
-
- /**
- * @return a File object that can be used to write the DDL statement.
- * If we're in gen-only mode, this should be a file in the outdir, named
- * after the Hive table we're creating. If we're in import mode, this should
- * be a one-off temporary file.
- */
- private File getScriptFile(String outputTableName) throws IOException {
- if (!isGenerateOnly()) {
- return File.createTempFile("hive-script-", ".txt",
- new File(options.getTempDir()));
- } else {
- return new File(new File(options.getCodeOutputDir()),
- outputTableName + ".q");
- }
+ super(opts, connMgr, conf, generateOnly);
}
- /**
- * Perform the import of data from an HDFS path to a Hive table.
- *
- * @param inputTableName the name of the table as loaded into HDFS
- * @param outputTableName the name of the table to create in Hive.
- * @param createOnly if true, run the CREATE TABLE statement but not
- * LOAD DATA.
- */
- public void importTable(String inputTableName, String outputTableName,
- boolean createOnly) throws IOException {
-
- if (!isGenerateOnly()) {
- removeTempLogs(inputTableName);
- LOG.info("Loading uploaded data into Hive");
- }
-
- if (null == outputTableName) {
- outputTableName = inputTableName;
- }
- LOG.debug("Hive.inputTable: " + inputTableName);
- LOG.debug("Hive.outputTable: " + outputTableName);
-
- // For testing purposes against our mock hive implementation,
- // if the sysproperty "expected.script" is set, we set the EXPECTED_SCRIPT
- // environment variable for the child hive process. We also disable
- // timestamp comments so that we have deterministic table creation scripts.
- String expectedScript = System.getProperty("expected.script");
- List<String> env = Executor.getCurEnvpStrings();
- boolean debugMode = expectedScript != null;
- if (debugMode) {
- env.add("EXPECTED_SCRIPT=" + expectedScript);
- env.add("TMPDIR=" + options.getTempDir());
- }
-
- // generate the HQL statements to run.
- TableDefWriter tableWriter = new TableDefWriter(options, connManager,
- inputTableName, outputTableName,
- configuration, !debugMode);
- String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
- String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
-
- if (!isGenerateOnly()) {
- String codec = options.getCompressionCodec();
- if (codec != null && (codec.equals(CodecMap.LZOP)
- || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
- try {
- String finalPathStr = tableWriter.getFinalPathStr();
- Tool tool = ReflectionUtils.newInstance(Class.
- forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
- asSubclass(Tool.class), configuration);
- ToolRunner.run(configuration, tool, new String[] { finalPathStr });
- } catch (Exception ex) {
- LOG.error("Error indexing lzo files", ex);
- throw new IOException("Error indexing lzo files", ex);
- }
- }
- }
-
- // write them to a script file.
- File scriptFile = getScriptFile(outputTableName);
- try {
- String filename = scriptFile.toString();
- BufferedWriter w = null;
- try {
- FileOutputStream fos = new FileOutputStream(scriptFile);
- w = new BufferedWriter(new OutputStreamWriter(fos));
- w.write(createTableStr, 0, createTableStr.length());
- if (!createOnly) {
- w.write(loadDataStmtStr, 0, loadDataStmtStr.length());
- }
- } catch (IOException ioe) {
- LOG.error("Error writing Hive load-in script: " + ioe.toString());
- ioe.printStackTrace();
- throw ioe;
- } finally {
- if (null != w) {
- try {
- w.close();
- } catch (IOException ioe) {
- LOG.warn("IOException closing stream to Hive script: "
- + ioe.toString());
- }
- }
- }
-
- if (!isGenerateOnly()) {
- executeScript(filename, env);
-
- LOG.info("Hive import complete.");
- }
- } finally {
- if (!isGenerateOnly()) {
- // User isn't interested in saving the DDL. Remove the file.
- if (!scriptFile.delete()) {
- LOG.warn("Could not remove temporary file: " + scriptFile.toString());
- // try to delete the file later.
- scriptFile.deleteOnExit();
- }
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- /**
- * Execute the script file via Hive.
- * If Hive's jars are on the classpath, run it in the same process.
- * Otherwise, execute the file with 'bin/hive'.
- *
- * @param filename The script file to run.
- * @param env the environment strings to pass to any subprocess.
- * @throws IOException if Hive did not exit successfully.
- */
- private void executeScript(String filename, List<String> env)
- throws IOException {
- SubprocessSecurityManager subprocessSM = null;
-
- try {
- Class cliDriverClass = Class.forName(HIVE_MAIN_CLASS);
-
- // We loaded the CLI Driver in this JVM, so we will just
- // call it in-process. The CliDriver class has a method:
- // void main(String [] args) throws Exception.
- //
- // We'll call that here to invoke 'hive -f scriptfile'.
- // Because this method will call System.exit(), we use
- // a SecurityManager to prevent this.
- LOG.debug("Using in-process Hive instance.");
-
- subprocessSM = new SubprocessSecurityManager();
- subprocessSM.install();
-
- // Create the argv for the Hive Cli Driver.
- String [] argArray = new String[2];
- argArray[0] = "-f";
- argArray[1] = filename;
-
- // And invoke the static method on this array.
- Method mainMethod = cliDriverClass.getMethod("main", argArray.getClass());
- mainMethod.invoke(null, (Object) argArray);
-
- } catch (ClassNotFoundException cnfe) {
- // Hive is not on the classpath. Run externally.
- // This is not an error path.
- LOG.debug("Using external Hive process.");
- executeExternalHiveScript(filename, env);
- } catch (NoSuchMethodException nsme) {
- // Could not find a handle to the main() method.
- throw new IOException("Could not access CliDriver.main()", nsme);
- } catch (IllegalAccessException iae) {
- // Error getting a handle on the main() method.
- throw new IOException("Could not access CliDriver.main()", iae);
- } catch (InvocationTargetException ite) {
- // We ran CliDriver.main() and an exception was thrown from within Hive.
- // This may have been the ExitSecurityException triggered by the
- // SubprocessSecurityManager. If so, handle it. Otherwise, wrap in
- // an IOException and rethrow.
-
- Throwable cause = ite.getCause();
- if (cause instanceof ExitSecurityException) {
- ExitSecurityException ese = (ExitSecurityException) cause;
- int status = ese.getExitStatus();
- if (status != 0) {
- throw new IOException("Hive CliDriver exited with status=" + status);
- }
- } else {
- throw new IOException("Exception thrown in Hive", ite);
- }
- } finally {
- if (null != subprocessSM) {
- // Uninstall the SecurityManager used to trap System.exit().
- subprocessSM.uninstall();
- }
- }
- }
-
- /**
- * Execute Hive via an external 'bin/hive' process.
- * @param filename the Script file to run.
- * @param env the environment strings to pass to any subprocess.
- * @throws IOException if Hive did not exit successfully.
- */
- private void executeExternalHiveScript(String filename, List<String> env)
- throws IOException {
- // run Hive on the script and note the return code.
- String hiveExec = getHiveBinPath();
- ArrayList<String> args = new ArrayList<String>();
- args.add(hiveExec);
- args.add("-f");
- args.add(filename);
-
- LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
- int ret = Executor.exec(args.toArray(new String[0]),
- env.toArray(new String[0]), logSink, logSink);
- if (0 != ret) {
- throw new IOException("Hive exited with status " + ret);
- }
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveTypes.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveTypes.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveTypes.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveTypes.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,71 +18,19 @@
package com.cloudera.sqoop.hive;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.sql.Types;
-
/**
- * Defines conversion between SQL types and Hive types.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public final class HiveTypes {
- public static final Log LOG = LogFactory.getLog(HiveTypes.class.getName());
+ private HiveTypes() { }
- private HiveTypes() {
- }
-
- /**
- * Given JDBC SQL types coming from another database, what is the best
- * mapping to a Hive-specific type?
- */
public static String toHiveType(int sqlType) {
-
- switch (sqlType) {
- case Types.INTEGER:
- case Types.SMALLINT:
- return "INT";
- case Types.VARCHAR:
- case Types.CHAR:
- case Types.LONGVARCHAR:
- case Types.NVARCHAR:
- case Types.NCHAR:
- case Types.LONGNVARCHAR:
- case Types.DATE:
- case Types.TIME:
- case Types.TIMESTAMP:
- case Types.CLOB:
- return "STRING";
- case Types.NUMERIC:
- case Types.DECIMAL:
- case Types.FLOAT:
- case Types.DOUBLE:
- case Types.REAL:
- return "DOUBLE";
- case Types.BIT:
- case Types.BOOLEAN:
- return "BOOLEAN";
- case Types.TINYINT:
- return "TINYINT";
- case Types.BIGINT:
- return "BIGINT";
- default:
- // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT,
- // BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT.
- return null;
- }
+ return org.apache.sqoop.hive.HiveTypes.toHiveType(sqlType);
}
- /**
- * @return true if a sql type can't be translated to a precise match
- * in Hive, and we have to cast it to something more generic.
- */
public static boolean isHiveTypeImprovised(int sqlType) {
- return sqlType == Types.DATE || sqlType == Types.TIME
- || sqlType == Types.TIMESTAMP
- || sqlType == Types.DECIMAL
- || sqlType == Types.NUMERIC;
+ return org.apache.sqoop.hive.HiveTypes.isHiveTypeImprovised(sqlType);
}
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,268 +19,25 @@
package com.cloudera.sqoop.hive;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.io.CodecMap;
import com.cloudera.sqoop.manager.ConnManager;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Date;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-
-import java.util.Properties;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
/**
- * Creates (Hive-specific) SQL DDL statements to create tables to hold data
- * we're importing from another source.
- *
- * After we import the database into HDFS, we can inject it into Hive using
- * the CREATE TABLE and LOAD DATA INPATH statements generated by this object.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class TableDefWriter {
-
- public static final Log LOG = LogFactory.getLog(
- TableDefWriter.class.getName());
+public class TableDefWriter
+ extends org.apache.sqoop.hive.TableDefWriter {
- private SqoopOptions options;
- private ConnManager connManager;
- private Configuration configuration;
- private String inputTableName;
- private String outputTableName;
- private boolean commentsEnabled;
-
- /**
- * Creates a new TableDefWriter to generate a Hive CREATE TABLE statement.
- * @param opts program-wide options
- * @param connMgr the connection manager used to describe the table.
- * @param inputTable the name of the table to load.
- * @param outputTable the name of the Hive table to create.
- * @param config the Hadoop configuration to use to connect to the dfs
- * @param withComments if true, then tables will be created with a
- * timestamp comment.
- */
public TableDefWriter(final SqoopOptions opts, final ConnManager connMgr,
final String inputTable, final String outputTable,
final Configuration config, final boolean withComments) {
- this.options = opts;
- this.connManager = connMgr;
- this.inputTableName = inputTable;
- this.outputTableName = outputTable;
- this.configuration = config;
- this.commentsEnabled = withComments;
+ super(opts, connMgr, inputTable, outputTable, config, withComments);
}
- private Map<String, Integer> externalColTypes;
-
- /**
- * Set the column type map to be used.
- * (dependency injection for testing; not used in production.)
- */
- void setColumnTypes(Map<String, Integer> colTypes) {
- this.externalColTypes = colTypes;
- LOG.debug("Using test-controlled type map");
- }
-
- /**
- * Get the column names to import.
- */
- private String [] getColumnNames() {
- String [] colNames = options.getColumns();
- if (null != colNames) {
- return colNames; // user-specified column names.
- } else if (null != externalColTypes) {
- // Test-injection column mapping. Extract the col names from this.
- ArrayList<String> keyList = new ArrayList<String>();
- for (String key : externalColTypes.keySet()) {
- keyList.add(key);
- }
-
- return keyList.toArray(new String[keyList.size()]);
- } else if (null != inputTableName) {
- return connManager.getColumnNames(inputTableName);
- } else {
- return connManager.getColumnNamesForQuery(options.getSqlQuery());
- }
+ public static String getHiveOctalCharCode(int charNum) {
+ return org.apache.sqoop.hive.TableDefWriter.getHiveOctalCharCode(charNum);
}
- /**
- * @return the CREATE TABLE statement for the table to load into hive.
- */
- public String getCreateTableStmt() throws IOException {
- Map<String, Integer> columnTypes;
- Properties userMapping = options.getMapColumnHive();
-
- if (externalColTypes != null) {
- // Use pre-defined column types.
- columnTypes = externalColTypes;
- } else {
- // Get these from the database.
- if (null != inputTableName) {
- columnTypes = connManager.getColumnTypes(inputTableName);
- } else {
- columnTypes = connManager.getColumnTypesForQuery(options.getSqlQuery());
- }
- }
-
- String [] colNames = getColumnNames();
- StringBuilder sb = new StringBuilder();
- if (options.doFailIfHiveTableExists()) {
- sb.append("CREATE TABLE `").append(outputTableName).append("` ( ");
- } else {
- sb.append("CREATE TABLE IF NOT EXISTS `");
- sb.append(outputTableName).append("` ( ");
- }
-
- // Check that all explicitly mapped columns are present in result set
- for(Object column : userMapping.keySet()) {
- boolean found = false;
- for(String c : colNames) {
- if(c.equals(column)) {
- found = true;
- break;
- }
- }
-
- if(!found) {
- throw new IllegalArgumentException("No column by the name " + column
- + "found while importing data");
- }
- }
-
- boolean first = true;
- for (String col : colNames) {
- if (!first) {
- sb.append(", ");
- }
-
- first = false;
-
- Integer colType = columnTypes.get(col);
- String hiveColType = userMapping.getProperty(col);
- if(hiveColType == null) { hiveColType = connManager.toHiveType(colType); }
- if (null == hiveColType) {
- throw new IOException("Hive does not support the SQL type for column "
- + col);
- }
-
- sb.append('`').append(col).append("` ").append(hiveColType);
-
- if (HiveTypes.isHiveTypeImprovised(colType)) {
- LOG.warn(
- "Column " + col + " had to be cast to a less precise type in Hive");
- }
- }
-
- sb.append(") ");
-
- if (commentsEnabled) {
- DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
- String curDateStr = dateFormat.format(new Date());
- sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' ");
- }
-
- if (options.getHivePartitionKey() != null) {
- sb.append("PARTITIONED BY (")
- .append(options.getHivePartitionKey())
- .append(" STRING) ");
- }
-
- sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
- sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
- sb.append("' LINES TERMINATED BY '");
- sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim()));
- String codec = options.getCompressionCodec();
- if (codec != null && (codec.equals(CodecMap.LZOP)
- || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
- sb.append("' STORED AS INPUTFORMAT "
- + "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'");
- sb.append(" OUTPUTFORMAT "
- + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
- } else {
- sb.append("' STORED AS TEXTFILE");
- }
-
- LOG.debug("Create statement: " + sb.toString());
- return sb.toString();
- }
-
- private static final int DEFAULT_HDFS_PORT =
- org.apache.hadoop.hdfs.server.namenode.NameNode.DEFAULT_PORT;
-
- /**
- * @return the LOAD DATA statement to import the data in HDFS into hive.
- */
- public String getLoadDataStmt() throws IOException {
- String finalPathStr = getFinalPathStr();
-
- StringBuilder sb = new StringBuilder();
- sb.append("LOAD DATA INPATH '");
- sb.append(finalPathStr + "'");
- if (options.doOverwriteHiveTable()) {
- sb.append(" OVERWRITE");
- }
- sb.append(" INTO TABLE `");
- sb.append(outputTableName);
- sb.append('`');
-
- if (options.getHivePartitionKey() != null) {
- sb.append(" PARTITION (")
- .append(options.getHivePartitionKey())
- .append("='").append(options.getHivePartitionValue())
- .append("')");
- }
-
- LOG.debug("Load statement: " + sb.toString());
- return sb.toString();
- }
-
- public String getFinalPathStr() throws IOException {
- String warehouseDir = options.getWarehouseDir();
- if (null == warehouseDir) {
- warehouseDir = "";
- } else if (!warehouseDir.endsWith(File.separator)) {
- warehouseDir = warehouseDir + File.separator;
- }
-
- String tablePath;
- if (null != inputTableName) {
- tablePath = warehouseDir + inputTableName;
- } else {
- tablePath = options.getTargetDir();
- }
- FileSystem fs = FileSystem.get(configuration);
- Path finalPath = new Path(tablePath).makeQualified(fs);
- return finalPath.toString();
- }
-
- /**
- * Return a string identifying the character to use as a delimiter
- * in Hive, in octal representation.
- * Hive can specify delimiter characters in the form '\ooo' where
- * ooo is a three-digit octal number between 000 and 177. Values
- * may not be truncated ('\12' is wrong; '\012' is ok) nor may they
- * be zero-prefixed (e.g., '\0177' is wrong).
- *
- * @param charNum the character to use as a delimiter
- * @return a string of the form "\ooo" where ooo is an octal number
- * in [000, 177].
- * @throws IllegalArgumentException if charNum > 0177.
- */
- static String getHiveOctalCharCode(int charNum) {
- if (charNum > 0177) {
- throw new IllegalArgumentException(
- "Character " + charNum + " is an out-of-range delimiter");
- }
-
- return String.format("\\%03o", charNum);
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java Mon Oct 31 21:15:29 2011
@@ -18,9 +18,6 @@
package com.cloudera.sqoop.io;
-import org.apache.sqoop.io.SplittingOutputStream;
-
-
/**
* A BufferedWriter implementation that wraps around a SplittingOutputStream
* and allows splitting of the underlying stream.
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.cloudera.sqoop.lib.FieldMappable;
+import com.cloudera.sqoop.lib.FieldMapProcessor;
+import com.cloudera.sqoop.lib.ProcessingException;
+
+/**
+ * SqoopRecordProcessor that performs an HBase "put" operation
+ * that contains all the fields of the record.
+ */
+public class HBasePutProcessor implements Closeable, Configurable,
+ FieldMapProcessor {
+
+ /** Configuration key specifying the table to insert into. */
+ public static final String TABLE_NAME_KEY = "sqoop.hbase.insert.table";
+
+ /** Configuration key specifying the column family to insert into. */
+ public static final String COL_FAMILY_KEY =
+ "sqoop.hbase.insert.column.family";
+
+ /** Configuration key specifying the column of the input whose value
+ * should be used as the row id.
+ */
+ public static final String ROW_KEY_COLUMN_KEY =
+ "sqoop.hbase.insert.row.key.column";
+
+ /**
+ * Configuration key specifying the PutTransformer implementation to use.
+ */
+ public static final String TRANSFORMER_CLASS_KEY =
+ "sqoop.hbase.insert.put.transformer.class";
+
+ private Configuration conf;
+
+ // An object that can transform a map of fieldName->object
+ // into a Put command.
+ private PutTransformer putTransformer;
+
+ private String tableName;
+ private HTable table;
+
+ public HBasePutProcessor() {
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setConf(Configuration config) {
+ this.conf = config;
+
+ // Get the implementation of PutTransformer to use.
+ // By default, we call toString() on every non-null field.
+ Class<? extends PutTransformer> xformerClass =
+ (Class<? extends PutTransformer>)
+ this.conf.getClass(TRANSFORMER_CLASS_KEY, ToStringPutTransformer.class);
+ this.putTransformer = (PutTransformer)
+ ReflectionUtils.newInstance(xformerClass, this.conf);
+ if (null == putTransformer) {
+ throw new RuntimeException("Could not instantiate PutTransformer.");
+ }
+
+ this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
+ this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
+
+ this.tableName = conf.get(TABLE_NAME_KEY, null);
+ try {
+ this.table = new HTable(conf, this.tableName);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Could not access HBase table " + tableName,
+ ioe);
+ }
+ this.table.setAutoFlush(false);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ /**
+ * Processes a record by extracting its field map and converting
+ * it into a list of Put commands into HBase.
+ */
+ public void accept(FieldMappable record)
+ throws IOException, ProcessingException {
+ Map<String, Object> fields = record.getFieldMap();
+
+ List<Put> putList = putTransformer.getPutCommand(fields);
+ if (null != putList) {
+ for (Put put : putList) {
+ this.table.put(put);
+ }
+ }
+ }
+
+ @Override
+ /**
+ * Closes the HBase table and commits all pending operations.
+ */
+ public void close() throws IOException {
+ this.table.flushCommits();
+ this.table.close();
+ }
+
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBaseUtil.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBaseUtil.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBaseUtil.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBaseUtil.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hbase;
+
+/**
+ * This class provides a method that checks if HBase jars are present in the
+ * current classpath. It also provides a setAlwaysNoHBaseJarMode mechanism for
+ * testing and simulation the condition where the is on HBase jar (since hbase
+ * is pulled automatically by ivy)
+ */
+public final class HBaseUtil {
+
+ private static boolean testingMode = false;
+
+ private HBaseUtil() {
+ }
+
+ /**
+ * This is a way to make this always return false for testing.
+ */
+ public static void setAlwaysNoHBaseJarMode(boolean mode) {
+ testingMode = mode;
+ }
+
+ public static boolean isHBaseJarPresent() {
+ if (testingMode) {
+ return false;
+ }
+ try {
+ Class.forName("org.apache.hadoop.hbase.client.HTable");
+ } catch (ClassNotFoundException cnfe) {
+ return false;
+ }
+ return true;
+ }
+
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/PutTransformer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/PutTransformer.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/PutTransformer.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/PutTransformer.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hbase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.Put;
+
+/**
+ * Interface that takes a map of jdbc field names to values
+ * and converts them to a Put command for HBase.
+ */
+public abstract class PutTransformer {
+
+ private String columnFamily;
+ private String rowKeyColumn;
+
+ /**
+ * @return the default column family to insert into.
+ */
+ public String getColumnFamily() {
+ return this.columnFamily;
+ }
+
+ /**
+ * Set the default column family to insert into.
+ */
+ public void setColumnFamily(String colFamily) {
+ this.columnFamily = colFamily;
+ }
+
+ /**
+ * @return the field name identifying the value to use as the row id.
+ */
+ public String getRowKeyColumn() {
+ return this.rowKeyColumn;
+ }
+
+ /**
+ * Set the column of the input fields which should be used to calculate
+ * the row id.
+ */
+ public void setRowKeyColumn(String rowKeyCol) {
+ this.rowKeyColumn = rowKeyCol;
+ }
+
+ /**
+ * Returns a list of Put commands that inserts the fields into a row in HBase.
+ * @param fields a map of field names to values to insert.
+ * @return A list of Put commands that inserts these into HBase.
+ */
+ public abstract List<Put> getPutCommand(Map<String, Object> fields)
+ throws IOException;
+
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hbase;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.cloudera.sqoop.hbase.PutTransformer;
+
+/**
+ * PutTransformer that calls toString on all non-null fields.
+ */
+public class ToStringPutTransformer extends PutTransformer {
+
+ public static final Log LOG = LogFactory.getLog(
+ ToStringPutTransformer.class.getName());
+
+ // A mapping from field name -> bytes for that field name.
+ // Used to cache serialization work done for fields names.
+ private Map<String, byte[]> serializedFieldNames;
+
+ public ToStringPutTransformer() {
+ serializedFieldNames = new TreeMap<String, byte[]>();
+ }
+
+ /**
+ * Return the serialized bytes for a field name, using
+ * the cache if it's already in there.
+ */
+ private byte [] getFieldNameBytes(String fieldName) {
+ byte [] cachedName = serializedFieldNames.get(fieldName);
+ if (null != cachedName) {
+ // Cache hit. We're done.
+ return cachedName;
+ }
+
+ // Do the serialization and memoize the result.
+ byte [] nameBytes = Bytes.toBytes(fieldName);
+ serializedFieldNames.put(fieldName, nameBytes);
+ return nameBytes;
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public List<Put> getPutCommand(Map<String, Object> fields)
+ throws IOException {
+
+ String rowKeyCol = getRowKeyColumn();
+ String colFamily = getColumnFamily();
+ byte [] colFamilyBytes = Bytes.toBytes(colFamily);
+
+ Object rowKey = fields.get(rowKeyCol);
+ if (null == rowKey) {
+ // If the row-key column is null, we don't insert this row.
+ LOG.warn("Could not insert row with null value for row-key column: "
+ + rowKeyCol);
+ return null;
+ }
+
+ Put put = new Put(Bytes.toBytes(rowKey.toString()));
+
+ for (Map.Entry<String, Object> fieldEntry : fields.entrySet()) {
+ String colName = fieldEntry.getKey();
+ if (!colName.equals(rowKeyCol)) {
+ // This is a regular field, not the row key.
+ // Add it if it's not null.
+ Object val = fieldEntry.getValue();
+ if (null != val) {
+ put.add(colFamilyBytes, getFieldNameBytes(colName),
+ Bytes.toBytes(val.toString()));
+ }
+ }
+ }
+
+ return Collections.singletonList(put);
+ }
+
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.Tool;
+import org.apache.sqoop.io.CodecMap;
+import org.apache.sqoop.util.Executor;
+import org.apache.sqoop.util.LoggingAsyncSink;
+import org.apache.sqoop.util.SubprocessSecurityManager;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.util.ExitSecurityException;
+
+/**
+ * Utility to import a table into the Hive metastore. Manages the connection
+ * to Hive itself as well as orchestrating the use of the other classes in this
+ * package.
+ */
+public class HiveImport {
+
+ public static final Log LOG = LogFactory.getLog(HiveImport.class.getName());
+
+ private SqoopOptions options;
+ private ConnManager connManager;
+ private Configuration configuration;
+ private boolean generateOnly;
+
+ /** Entry point through which Hive invocation should be attempted. */
+ private static final String HIVE_MAIN_CLASS =
+ "org.apache.hadoop.hive.cli.CliDriver";
+
+ public HiveImport(final SqoopOptions opts, final ConnManager connMgr,
+ final Configuration conf, final boolean generateOnly) {
+ this.options = opts;
+ this.connManager = connMgr;
+ this.configuration = conf;
+ this.generateOnly = generateOnly;
+ }
+
+
+ /**
+ * @return the filename of the hive executable to run to do the import
+ */
+ private String getHiveBinPath() {
+ // If the user has $HIVE_HOME set, then use $HIVE_HOME/bin/hive if it
+ // exists.
+ // Fall back to just plain 'hive' and hope it's in the path.
+
+ String hiveHome = options.getHiveHome();
+ if (null == hiveHome) {
+ return "hive";
+ }
+
+ Path p = new Path(hiveHome);
+ p = new Path(p, "bin");
+ p = new Path(p, "hive");
+ String hiveBinStr = p.toString();
+ if (new File(hiveBinStr).exists()) {
+ return hiveBinStr;
+ } else {
+ return "hive";
+ }
+ }
+
+ /**
+ * If we used a MapReduce-based upload of the data, remove the _logs dir
+ * from where we put it, before running Hive LOAD DATA INPATH.
+ */
+ private void removeTempLogs(String tableName) throws IOException {
+ FileSystem fs = FileSystem.get(configuration);
+ Path tablePath;
+ if (null != tableName) {
+ String warehouseDir = options.getWarehouseDir();
+ if (warehouseDir != null) {
+ tablePath = new Path(new Path(warehouseDir), tableName);
+ } else {
+ tablePath = new Path(tableName);
+ }
+ } else {
+ // --table option is not used, so use the target dir instead
+ tablePath = new Path(options.getTargetDir());
+ }
+
+ Path logsPath = new Path(tablePath, "_logs");
+ if (fs.exists(logsPath)) {
+ LOG.info("Removing temporary files from import process: " + logsPath);
+ if (!fs.delete(logsPath, true)) {
+ LOG.warn("Could not delete temporary files; "
+ + "continuing with import, but it may fail.");
+ }
+ }
+ }
+
+ /**
+ * @return true if we're just generating the DDL for the import, but
+ * not actually running it (i.e., --generate-only mode). If so, don't
+ * do any side-effecting actions in Hive.
+ */
+ private boolean isGenerateOnly() {
+ return generateOnly;
+ }
+
+ /**
+ * @return a File object that can be used to write the DDL statement.
+ * If we're in gen-only mode, this should be a file in the outdir, named
+ * after the Hive table we're creating. If we're in import mode, this should
+ * be a one-off temporary file.
+ */
+ private File getScriptFile(String outputTableName) throws IOException {
+ if (!isGenerateOnly()) {
+ return File.createTempFile("hive-script-", ".txt",
+ new File(options.getTempDir()));
+ } else {
+ return new File(new File(options.getCodeOutputDir()),
+ outputTableName + ".q");
+ }
+ }
+
+ /**
+ * Perform the import of data from an HDFS path to a Hive table.
+ *
+ * @param inputTableName the name of the table as loaded into HDFS
+ * @param outputTableName the name of the table to create in Hive.
+ * @param createOnly if true, run the CREATE TABLE statement but not
+ * LOAD DATA.
+ */
+ public void importTable(String inputTableName, String outputTableName,
+ boolean createOnly) throws IOException {
+
+ if (!isGenerateOnly()) {
+ removeTempLogs(inputTableName);
+ LOG.info("Loading uploaded data into Hive");
+ }
+
+ if (null == outputTableName) {
+ outputTableName = inputTableName;
+ }
+ LOG.debug("Hive.inputTable: " + inputTableName);
+ LOG.debug("Hive.outputTable: " + outputTableName);
+
+ // For testing purposes against our mock hive implementation,
+ // if the sysproperty "expected.script" is set, we set the EXPECTED_SCRIPT
+ // environment variable for the child hive process. We also disable
+ // timestamp comments so that we have deterministic table creation scripts.
+ String expectedScript = System.getProperty("expected.script");
+ List<String> env = Executor.getCurEnvpStrings();
+ boolean debugMode = expectedScript != null;
+ if (debugMode) {
+ env.add("EXPECTED_SCRIPT=" + expectedScript);
+ env.add("TMPDIR=" + options.getTempDir());
+ }
+
+ // generate the HQL statements to run.
+ TableDefWriter tableWriter = new TableDefWriter(options, connManager,
+ inputTableName, outputTableName,
+ configuration, !debugMode);
+ String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
+ String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
+
+ if (!isGenerateOnly()) {
+ String codec = options.getCompressionCodec();
+ if (codec != null && (codec.equals(CodecMap.LZOP)
+ || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
+ try {
+ String finalPathStr = tableWriter.getFinalPathStr();
+ Tool tool = ReflectionUtils.newInstance(Class.
+ forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
+ asSubclass(Tool.class), configuration);
+ ToolRunner.run(configuration, tool, new String[] { finalPathStr });
+ } catch (Exception ex) {
+ LOG.error("Error indexing lzo files", ex);
+ throw new IOException("Error indexing lzo files", ex);
+ }
+ }
+ }
+
+ // write them to a script file.
+ File scriptFile = getScriptFile(outputTableName);
+ try {
+ String filename = scriptFile.toString();
+ BufferedWriter w = null;
+ try {
+ FileOutputStream fos = new FileOutputStream(scriptFile);
+ w = new BufferedWriter(new OutputStreamWriter(fos));
+ w.write(createTableStr, 0, createTableStr.length());
+ if (!createOnly) {
+ w.write(loadDataStmtStr, 0, loadDataStmtStr.length());
+ }
+ } catch (IOException ioe) {
+ LOG.error("Error writing Hive load-in script: " + ioe.toString());
+ ioe.printStackTrace();
+ throw ioe;
+ } finally {
+ if (null != w) {
+ try {
+ w.close();
+ } catch (IOException ioe) {
+ LOG.warn("IOException closing stream to Hive script: "
+ + ioe.toString());
+ }
+ }
+ }
+
+ if (!isGenerateOnly()) {
+ executeScript(filename, env);
+
+ LOG.info("Hive import complete.");
+ }
+ } finally {
+ if (!isGenerateOnly()) {
+ // User isn't interested in saving the DDL. Remove the file.
+ if (!scriptFile.delete()) {
+ LOG.warn("Could not remove temporary file: " + scriptFile.toString());
+ // try to delete the file later.
+ scriptFile.deleteOnExit();
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ /**
+ * Execute the script file via Hive.
+ * If Hive's jars are on the classpath, run it in the same process.
+ * Otherwise, execute the file with 'bin/hive'.
+ *
+ * @param filename The script file to run.
+ * @param env the environment strings to pass to any subprocess.
+ * @throws IOException if Hive did not exit successfully.
+ */
+ private void executeScript(String filename, List<String> env)
+ throws IOException {
+ SubprocessSecurityManager subprocessSM = null;
+
+ try {
+ Class cliDriverClass = Class.forName(HIVE_MAIN_CLASS);
+
+ // We loaded the CLI Driver in this JVM, so we will just
+ // call it in-process. The CliDriver class has a method:
+ // void main(String [] args) throws Exception.
+ //
+ // We'll call that here to invoke 'hive -f scriptfile'.
+ // Because this method will call System.exit(), we use
+ // a SecurityManager to prevent this.
+ LOG.debug("Using in-process Hive instance.");
+
+ subprocessSM = new SubprocessSecurityManager();
+ subprocessSM.install();
+
+ // Create the argv for the Hive Cli Driver.
+ String [] argArray = new String[2];
+ argArray[0] = "-f";
+ argArray[1] = filename;
+
+ // And invoke the static method on this array.
+ Method mainMethod = cliDriverClass.getMethod("main", argArray.getClass());
+ mainMethod.invoke(null, (Object) argArray);
+
+ } catch (ClassNotFoundException cnfe) {
+ // Hive is not on the classpath. Run externally.
+ // This is not an error path.
+ LOG.debug("Using external Hive process.");
+ executeExternalHiveScript(filename, env);
+ } catch (NoSuchMethodException nsme) {
+ // Could not find a handle to the main() method.
+ throw new IOException("Could not access CliDriver.main()", nsme);
+ } catch (IllegalAccessException iae) {
+ // Error getting a handle on the main() method.
+ throw new IOException("Could not access CliDriver.main()", iae);
+ } catch (InvocationTargetException ite) {
+ // We ran CliDriver.main() and an exception was thrown from within Hive.
+ // This may have been the ExitSecurityException triggered by the
+ // SubprocessSecurityManager. If so, handle it. Otherwise, wrap in
+ // an IOException and rethrow.
+
+ Throwable cause = ite.getCause();
+ if (cause instanceof ExitSecurityException) {
+ ExitSecurityException ese = (ExitSecurityException) cause;
+ int status = ese.getExitStatus();
+ if (status != 0) {
+ throw new IOException("Hive CliDriver exited with status=" + status);
+ }
+ } else {
+ throw new IOException("Exception thrown in Hive", ite);
+ }
+ } finally {
+ if (null != subprocessSM) {
+ // Uninstall the SecurityManager used to trap System.exit().
+ subprocessSM.uninstall();
+ }
+ }
+ }
+
+ /**
+ * Execute Hive via an external 'bin/hive' process.
+ * @param filename the Script file to run.
+ * @param env the environment strings to pass to any subprocess.
+ * @throws IOException if Hive did not exit successfully.
+ */
+ private void executeExternalHiveScript(String filename, List<String> env)
+ throws IOException {
+ // run Hive on the script and note the return code.
+ String hiveExec = getHiveBinPath();
+ ArrayList<String> args = new ArrayList<String>();
+ args.add(hiveExec);
+ args.add("-f");
+ args.add(filename);
+
+ LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
+ int ret = Executor.exec(args.toArray(new String[0]),
+ env.toArray(new String[0]), logSink, logSink);
+ if (0 != ret) {
+ throw new IOException("Hive exited with status " + ret);
+ }
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveTypes.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveTypes.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveTypes.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveTypes.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import java.sql.Types;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Defines conversion between SQL types and Hive types.
+ */
+public final class HiveTypes {
+
+ public static final Log LOG = LogFactory.getLog(HiveTypes.class.getName());
+
+ private HiveTypes() { }
+
+ /**
+ * Given JDBC SQL types coming from another database, what is the best
+ * mapping to a Hive-specific type?
+ */
+ public static String toHiveType(int sqlType) {
+
+ switch (sqlType) {
+ case Types.INTEGER:
+ case Types.SMALLINT:
+ return "INT";
+ case Types.VARCHAR:
+ case Types.CHAR:
+ case Types.LONGVARCHAR:
+ case Types.NVARCHAR:
+ case Types.NCHAR:
+ case Types.LONGNVARCHAR:
+ case Types.DATE:
+ case Types.TIME:
+ case Types.TIMESTAMP:
+ case Types.CLOB:
+ return "STRING";
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ case Types.FLOAT:
+ case Types.DOUBLE:
+ case Types.REAL:
+ return "DOUBLE";
+ case Types.BIT:
+ case Types.BOOLEAN:
+ return "BOOLEAN";
+ case Types.TINYINT:
+ return "TINYINT";
+ case Types.BIGINT:
+ return "BIGINT";
+ default:
+ // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT,
+ // BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT.
+ return null;
+ }
+ }
+
+ /**
+ * @return true if a sql type can't be translated to a precise match
+ * in Hive, and we have to cast it to something more generic.
+ */
+ public static boolean isHiveTypeImprovised(int sqlType) {
+ return sqlType == Types.DATE || sqlType == Types.TIME
+ || sqlType == Types.TIMESTAMP
+ || sqlType == Types.DECIMAL
+ || sqlType == Types.NUMERIC;
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Date;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.io.CodecMap;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+
+/**
+ * Creates (Hive-specific) SQL DDL statements to create tables to hold data
+ * we're importing from another source.
+ *
+ * After we import the database into HDFS, we can inject it into Hive using
+ * the CREATE TABLE and LOAD DATA INPATH statements generated by this object.
+ */
+public class TableDefWriter {
+
+ public static final Log LOG = LogFactory.getLog(
+ TableDefWriter.class.getName());
+
+ private SqoopOptions options;
+ private ConnManager connManager;
+ private Configuration configuration;
+ private String inputTableName;
+ private String outputTableName;
+ private boolean commentsEnabled;
+
+ /**
+ * Creates a new TableDefWriter to generate a Hive CREATE TABLE statement.
+ * @param opts program-wide options
+ * @param connMgr the connection manager used to describe the table.
+ * @param inputTable the name of the table to load.
+ * @param outputTable the name of the Hive table to create.
+ * @param config the Hadoop configuration to use to connect to the dfs
+ * @param withComments if true, then tables will be created with a
+ * timestamp comment.
+ */
+ public TableDefWriter(final SqoopOptions opts, final ConnManager connMgr,
+ final String inputTable, final String outputTable,
+ final Configuration config, final boolean withComments) {
+ this.options = opts;
+ this.connManager = connMgr;
+ this.inputTableName = inputTable;
+ this.outputTableName = outputTable;
+ this.configuration = config;
+ this.commentsEnabled = withComments;
+ }
+
+ private Map<String, Integer> externalColTypes;
+
+ /**
+ * Set the column type map to be used.
+ * (dependency injection for testing; not used in production.)
+ */
+ public void setColumnTypes(Map<String, Integer> colTypes) {
+ this.externalColTypes = colTypes;
+ LOG.debug("Using test-controlled type map");
+ }
+
+ /**
+ * Get the column names to import.
+ */
+ private String [] getColumnNames() {
+ String [] colNames = options.getColumns();
+ if (null != colNames) {
+ return colNames; // user-specified column names.
+ } else if (null != externalColTypes) {
+ // Test-injection column mapping. Extract the col names from this.
+ ArrayList<String> keyList = new ArrayList<String>();
+ for (String key : externalColTypes.keySet()) {
+ keyList.add(key);
+ }
+
+ return keyList.toArray(new String[keyList.size()]);
+ } else if (null != inputTableName) {
+ return connManager.getColumnNames(inputTableName);
+ } else {
+ return connManager.getColumnNamesForQuery(options.getSqlQuery());
+ }
+ }
+
+ /**
+ * @return the CREATE TABLE statement for the table to load into hive.
+ */
+ public String getCreateTableStmt() throws IOException {
+ Map<String, Integer> columnTypes;
+ Properties userMapping = options.getMapColumnHive();
+
+ if (externalColTypes != null) {
+ // Use pre-defined column types.
+ columnTypes = externalColTypes;
+ } else {
+ // Get these from the database.
+ if (null != inputTableName) {
+ columnTypes = connManager.getColumnTypes(inputTableName);
+ } else {
+ columnTypes = connManager.getColumnTypesForQuery(options.getSqlQuery());
+ }
+ }
+
+ String [] colNames = getColumnNames();
+ StringBuilder sb = new StringBuilder();
+ if (options.doFailIfHiveTableExists()) {
+ sb.append("CREATE TABLE `").append(outputTableName).append("` ( ");
+ } else {
+ sb.append("CREATE TABLE IF NOT EXISTS `");
+ sb.append(outputTableName).append("` ( ");
+ }
+
+ // Check that all explicitly mapped columns are present in result set
+ for(Object column : userMapping.keySet()) {
+ boolean found = false;
+ for(String c : colNames) {
+ if(c.equals(column)) {
+ found = true;
+ break;
+ }
+ }
+
+ if(!found) {
+ throw new IllegalArgumentException("No column by the name " + column
+ + "found while importing data");
+ }
+ }
+
+ boolean first = true;
+ for (String col : colNames) {
+ if (!first) {
+ sb.append(", ");
+ }
+
+ first = false;
+
+ Integer colType = columnTypes.get(col);
+ String hiveColType = userMapping.getProperty(col);
+ if(hiveColType == null) { hiveColType = connManager.toHiveType(colType); }
+ if (null == hiveColType) {
+ throw new IOException("Hive does not support the SQL type for column "
+ + col);
+ }
+
+ sb.append('`').append(col).append("` ").append(hiveColType);
+
+ if (HiveTypes.isHiveTypeImprovised(colType)) {
+ LOG.warn(
+ "Column " + col + " had to be cast to a less precise type in Hive");
+ }
+ }
+
+ sb.append(") ");
+
+ if (commentsEnabled) {
+ DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+ String curDateStr = dateFormat.format(new Date());
+ sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' ");
+ }
+
+ if (options.getHivePartitionKey() != null) {
+ sb.append("PARTITIONED BY (")
+ .append(options.getHivePartitionKey())
+ .append(" STRING) ");
+ }
+
+ sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
+ sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
+ sb.append("' LINES TERMINATED BY '");
+ sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim()));
+ String codec = options.getCompressionCodec();
+ if (codec != null && (codec.equals(CodecMap.LZOP)
+ || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
+ sb.append("' STORED AS INPUTFORMAT "
+ + "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'");
+ sb.append(" OUTPUTFORMAT "
+ + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
+ } else {
+ sb.append("' STORED AS TEXTFILE");
+ }
+
+ LOG.debug("Create statement: " + sb.toString());
+ return sb.toString();
+ }
+
+ /**
+ * @return the LOAD DATA statement to import the data in HDFS into hive.
+ */
+ public String getLoadDataStmt() throws IOException {
+ String finalPathStr = getFinalPathStr();
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("LOAD DATA INPATH '");
+ sb.append(finalPathStr + "'");
+ if (options.doOverwriteHiveTable()) {
+ sb.append(" OVERWRITE");
+ }
+ sb.append(" INTO TABLE `");
+ sb.append(outputTableName);
+ sb.append('`');
+
+ if (options.getHivePartitionKey() != null) {
+ sb.append(" PARTITION (")
+ .append(options.getHivePartitionKey())
+ .append("='").append(options.getHivePartitionValue())
+ .append("')");
+ }
+
+ LOG.debug("Load statement: " + sb.toString());
+ return sb.toString();
+ }
+
+ public String getFinalPathStr() throws IOException {
+ String warehouseDir = options.getWarehouseDir();
+ if (null == warehouseDir) {
+ warehouseDir = "";
+ } else if (!warehouseDir.endsWith(File.separator)) {
+ warehouseDir = warehouseDir + File.separator;
+ }
+
+ String tablePath;
+ if (null != inputTableName) {
+ tablePath = warehouseDir + inputTableName;
+ } else {
+ tablePath = options.getTargetDir();
+ }
+ FileSystem fs = FileSystem.get(configuration);
+ Path finalPath = new Path(tablePath).makeQualified(fs);
+ return finalPath.toString();
+ }
+
+ /**
+ * Return a string identifying the character to use as a delimiter
+ * in Hive, in octal representation.
+ * Hive can specify delimiter characters in the form '\ooo' where
+ * ooo is a three-digit octal number between 000 and 177. Values
+ * may not be truncated ('\12' is wrong; '\012' is ok) nor may they
+ * be zero-prefixed (e.g., '\0177' is wrong).
+ *
+ * @param charNum the character to use as a delimiter
+ * @return a string of the form "\ooo" where ooo is an octal number
+ * in [000, 177].
+ * @throws IllegalArgumentException if charNum > 0177.
+ */
+ public static String getHiveOctalCharCode(int charNum) {
+ if (charNum > 0177) {
+ throw new IllegalArgumentException(
+ "Character " + charNum + " is an out-of-range delimiter");
+ }
+
+ return String.format("\\%03o", charNum);
+ }
+
+}
+