You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2009/05/26 12:29:40 UTC
svn commit: r778646 [2/3] - in /hadoop/core/trunk: ./ src/contrib/sqoop/
src/contrib/sqoop/ivy/ src/contrib/sqoop/src/ src/contrib/sqoop/src/java/
src/contrib/sqoop/src/java/org/ src/contrib/sqoop/src/java/org/apache/
src/contrib/sqoop/src/java/org/apa...
Added: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java Tue May 26 10:29:38 2009
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.db.DBConfiguration;
+import org.apache.hadoop.mapred.lib.db.DBInputFormat;
+import org.apache.hadoop.mapred.lib.db.DBWritable;
+
+import org.apache.hadoop.sqoop.ConnFactory;
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.util.ClassLoaderStack;
+
+/**
+ * Actually runs a jdbc import job using the ORM files generated by the sqoop.orm package.
+ *
+ *
+ *
+ */
+public class ImportJob {
+
+ public static final Log LOG = LogFactory.getLog(ImportJob.class.getName());
+
+ private ImportOptions options;
+
+ public ImportJob(final ImportOptions opts) {
+ this.options = opts;
+ }
+
+ /**
+ * Run an import job to read a table in to HDFS
+ *
+ * @param tableName the database table to read
+ * @param ormJarFile the Jar file to insert into the dcache classpath. (may be null)
+ * @param orderByCol the column of the database table to use to order the import
+ * @param conf A fresh Hadoop Configuration to use to build an MR job.
+ */
+ public void runImport(String tableName, String ormJarFile, String orderByCol,
+ Configuration conf) throws IOException {
+
+ LOG.info("Beginning data import of " + tableName);
+
+ // TODO(aaron): If we add packages, the tableName will not be the class name.
+ String tableClassName = tableName;
+
+ boolean isLocal = "local".equals(conf.get("mapred.job.tracker"));
+ ClassLoader prevClassLoader = null;
+ if (isLocal) {
+ // If we're using the LocalJobRunner, then instead of using the compiled jar file
+ // as the job source, we're running in the current thread. Push on another classloader
+ // that loads from that jar in addition to everything currently on the classpath.
+
+ // take advantage of the fact that table name = class name.
+ prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, tableClassName);
+ }
+
+ try {
+ JobConf job = new JobConf(conf);
+ job.setJar(ormJarFile);
+
+ String hdfsWarehouseDir = options.getWarehouseDir();
+ Path outputPath;
+
+ if (null != hdfsWarehouseDir) {
+ Path hdfsWarehousePath = new Path(hdfsWarehouseDir);
+ hdfsWarehousePath.makeQualified(FileSystem.get(job));
+ outputPath = new Path(hdfsWarehousePath, tableName);
+ } else {
+ outputPath = new Path(tableName);
+ }
+
+ if (options.getFileLayout() == ImportOptions.FileLayout.TextFile) {
+ job.setMapperClass(TextImportMapper.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(NullWritable.class);
+ } else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+ job.set("mapred.output.value.class", tableClassName);
+ } else {
+ LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");
+ }
+
+ job.setNumReduceTasks(0);
+ job.setInputFormat(DBInputFormat.class);
+
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ ConnManager mgr = ConnFactory.getManager(options);
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job, mgr.getDriverClass(), options.getConnectString());
+ } else {
+ DBConfiguration.configureDB(job, mgr.getDriverClass(), options.getConnectString(),
+ username, options.getPassword());
+ }
+
+ String [] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = mgr.getColumnNames(tableName);
+ }
+
+ // We can't set the class properly in here, because we may not have the
+ // jar loaded in this JVM. So we start by calling setInput() with DBWritable,
+ // and then overriding the string manually.
+ DBInputFormat.setInput(job, DBWritable.class, tableName, null,
+ orderByCol, colNames);
+ job.set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName);
+
+ JobClient.runJob(job);
+ } finally {
+ if (isLocal && null != prevClassLoader) {
+ // unload the special classloader for this jar.
+ ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
+ }
+ }
+ }
+}
Added: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/TextImportMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/TextImportMapper.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/TextImportMapper.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/TextImportMapper.java Tue May 26 10:29:38 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.db.DBWritable;
+
+/**
+ * Converts an input record into a string representation and emit it.
+ *
+ *
+ */
+public class TextImportMapper extends MapReduceBase
+ implements Mapper<LongWritable, DBWritable, Text, NullWritable> {
+
+ private Text outkey;
+
+ public TextImportMapper() {
+ outkey = new Text();
+ }
+
+ public void map(LongWritable key, DBWritable val, OutputCollector<Text, NullWritable> output,
+ Reporter reporter) throws IOException {
+
+ outkey.set(val.toString());
+ output.collect(outkey, NullWritable.get());
+ }
+}
Added: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java Tue May 26 10:29:38 2009
@@ -0,0 +1,550 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.orm;
+
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.lib.BigDecimalSerializer;
+import org.apache.hadoop.sqoop.lib.JdbcWritableBridge;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Creates an ORM class to represent a table from a database
+ *
+ *
+ *
+ */
+public class ClassWriter {
+
+ public static final Log LOG = LogFactory.getLog(ClassWriter.class.getName());
+
+ /**
+ * This version number is injected into all generated Java classes to denote
+ * which version of the ClassWriter's output format was used to generate the
+ * class.
+ *
+ * If the way that we generate classes, bump this number.
+ */
+ public static final int CLASS_WRITER_VERSION = 1;
+
+ private ImportOptions options;
+ private ConnManager connManager;
+ private String tableName;
+ private CompilationManager compileManager;
+
+ /**
+ * Creates a new ClassWriter to generate an ORM class for a table.
+ * @param opts program-wide options
+ * @param connMgr the connection manager used to describe the table.
+ * @param table the name of the table to read.
+ */
+ public ClassWriter(final ImportOptions opts, final ConnManager connMgr,
+ final String table, final CompilationManager compMgr) {
+ this.options = opts;
+ this.connManager = connMgr;
+ this.tableName = table;
+ this.compileManager = compMgr;
+ }
+
+
+ /**
+ * @param javaType
+ * @return the name of the method of JdbcWritableBridge to read an entry with a given java type.
+ */
+ private String dbGetterForType(String javaType) {
+ // All Class-based types (e.g., java.math.BigDecimal) are handled with
+ // "readBar" where some.package.foo.Bar is the canonical class name.
+ // Turn the javaType string into the getter type string.
+
+ String [] parts = javaType.split("\\.");
+ if (parts.length == 0) {
+ LOG.error("No ResultSet method for Java type " + javaType);
+ return null;
+ }
+
+ String lastPart = parts[parts.length - 1];
+ try {
+ String getter = "read" + Character.toUpperCase(lastPart.charAt(0)) + lastPart.substring(1);
+ return getter;
+ } catch (StringIndexOutOfBoundsException oob) {
+ // lastPart.*() doesn't work on empty strings.
+ LOG.error("Could not infer JdbcWritableBridge getter for Java type " + javaType);
+ return null;
+ }
+ }
+
+ /**
+ * @param javaType
+ * @return the name of the method of JdbcWritableBridge to write an entry with a given java type.
+ */
+ private String dbSetterForType(String javaType) {
+ // TODO(aaron): Lots of unit tests needed here.
+ // See dbGetterForType() for the logic used here; it's basically the same.
+
+ String [] parts = javaType.split("\\.");
+ if (parts.length == 0) {
+ LOG.error("No PreparedStatement Set method for Java type " + javaType);
+ return null;
+ }
+
+ String lastPart = parts[parts.length - 1];
+ try {
+ String setter = "write" + Character.toUpperCase(lastPart.charAt(0)) + lastPart.substring(1);
+ return setter;
+ } catch (StringIndexOutOfBoundsException oob) {
+ // lastPart.*() doesn't work on empty strings.
+ LOG.error("Could not infer PreparedStatement setter for Java type " + javaType);
+ return null;
+ }
+ }
+
+ private String stringifierForType(String javaType, String colName) {
+ if (javaType.equals("String")) {
+ return colName;
+ } else {
+ // this is an object type -- just call its toString() in a null-safe way.
+ return "\"\" + " + colName;
+ }
+ }
+
+ /**
+ * @param javaType the type to read
+ * @param inputObj the name of the DataInput to read from
+ * @param colName the column name to read
+ * @return the line of code involving a DataInput object to read an entry with a given java type.
+ */
+ private String rpcGetterForType(String javaType, String inputObj, String colName) {
+ if (javaType.equals("Integer")) {
+ return " this." + colName + " = Integer.valueOf(" + inputObj + ".readInt());\n";
+ } else if (javaType.equals("Long")) {
+ return " this." + colName + " = Long.valueOf(" + inputObj + ".readLong());\n";
+ } else if (javaType.equals("Float")) {
+ return " this." + colName + " = Float.valueOf(" + inputObj + ".readFloat());\n";
+ } else if (javaType.equals("Double")) {
+ return " this." + colName + " = Double.valueOf(" + inputObj + ".readDouble());\n";
+ } else if (javaType.equals("Boolean")) {
+ return " this." + colName + " = Boolean.valueOf(" + inputObj + ".readBoolean());\n";
+ } else if (javaType.equals("String")) {
+ return " this." + colName + " = Text.readString(" + inputObj + ");\n";
+ } else if (javaType.equals("java.sql.Date")) {
+ return " this." + colName + " = new Date(" + inputObj + ".readLong());\n";
+ } else if (javaType.equals("java.sql.Time")) {
+ return " this." + colName + " = new Time(" + inputObj + ".readLong());\n";
+ } else if (javaType.equals("java.sql.Timestamp")) {
+ return " this." + colName + " = new Timestamp(" + inputObj + ".readLong());\n"
+ + " this." + colName + ".setNanos(" + inputObj + ".readInt());\n";
+ } else if (javaType.equals("java.math.BigDecimal")) {
+ return " this." + colName + " = " + BigDecimalSerializer.class.getCanonicalName()
+ + ".readFields(" + inputObj + ");\n";
+ } else {
+ LOG.error("No ResultSet method for Java type " + javaType);
+ return null;
+ }
+ }
+
+ /**
+ * Deserialize a possibly-null value from the DataInput stream
+ * @param javaType name of the type to deserialize if it's not null.
+ * @param inputObj name of the DataInput to read from
+ * @param colName the column name to read.
+ * @return
+ */
+ private String rpcGetterForMaybeNull(String javaType, String inputObj, String colName) {
+ return " if (" + inputObj + ".readBoolean()) { \n"
+ + " this." + colName + " = null;\n"
+ + " } else {\n"
+ + rpcGetterForType(javaType, inputObj, colName)
+ + " }\n";
+ }
+
+ /**
+ * @param javaType the type to write
+ * @param inputObj the name of the DataOutput to write to
+ * @param colName the column name to write
+ * @return the line of code involving a DataOutput object to write an entry with
+ * a given java type.
+ */
+ private String rpcSetterForType(String javaType, String outputObj, String colName) {
+ if (javaType.equals("Integer")) {
+ return " " + outputObj + ".writeInt(this." + colName + ");\n";
+ } else if (javaType.equals("Long")) {
+ return " " + outputObj + ".writeLong(this." + colName + ");\n";
+ } else if (javaType.equals("Boolean")) {
+ return " " + outputObj + ".writeBoolean(this." + colName + ");\n";
+ } else if (javaType.equals("Float")) {
+ return " " + outputObj + ".writeFloat(this." + colName + ");\n";
+ } else if (javaType.equals("Double")) {
+ return " " + outputObj + ".writeDouble(this." + colName + ");\n";
+ } else if (javaType.equals("String")) {
+ return " Text.writeString(" + outputObj + ", " + colName + ");\n";
+ } else if (javaType.equals("java.sql.Date")) {
+ return " " + outputObj + ".writeLong(this." + colName + ".getTime());\n";
+ } else if (javaType.equals("java.sql.Time")) {
+ return " " + outputObj + ".writeLong(this." + colName + ".getTime());\n";
+ } else if (javaType.equals("java.sql.Timestamp")) {
+ return " " + outputObj + ".writeLong(this." + colName + ".getTime());\n"
+ + " " + outputObj + ".writeInt(this." + colName + ".getNanos());\n";
+ } else if (javaType.equals("java.math.BigDecimal")) {
+ return " " + BigDecimalSerializer.class.getCanonicalName()
+ + ".write(this." + colName + ", " + outputObj + ");\n";
+ } else {
+ LOG.error("No ResultSet method for Java type " + javaType);
+ return null;
+ }
+ }
+
+ /**
+ * Serialize a possibly-null value to the DataOutput stream. First a boolean
+ * isNull is written, followed by the contents itself (if not null).
+ * @param javaType name of the type to deserialize if it's not null.
+ * @param inputObj name of the DataInput to read from
+ * @param colName the column name to read.
+ * @return
+ */
+ private String rpcSetterForMaybeNull(String javaType, String outputObj, String colName) {
+ return " if (null == this." + colName + ") { \n"
+ + " " + outputObj + ".writeBoolean(true);\n"
+ + " } else {\n"
+ + " " + outputObj + ".writeBoolean(false);\n"
+ + rpcSetterForType(javaType, outputObj, colName)
+ + " }\n";
+ }
+
+ /**
+ * Generate a member field and getter method for each column
+ * @param columnTypes - mapping from column names to sql types
+ * @param colNames - ordered list of column names for table.
+ * @param sb - StringBuilder to append code to
+ */
+ private void generateFields(Map<String, Integer> columnTypes, String [] colNames,
+ StringBuilder sb) {
+
+ for (String col : colNames) {
+ int sqlType = columnTypes.get(col);
+ String javaType = connManager.toJavaType(sqlType);
+ if (null == javaType) {
+ LOG.error("Cannot resolve SQL type " + sqlType);
+ continue;
+ }
+
+ sb.append(" private " + javaType + " " + col + ";\n");
+ sb.append(" public " + javaType + " get_" + col + "() {\n");
+ sb.append(" return " + col + ";\n");
+ sb.append(" }\n");
+ }
+ }
+
+ /**
+ * Generate the readFields() method used by the database
+ * @param columnTypes - mapping from column names to sql types
+ * @param colNames - ordered list of column names for table.
+ * @param sb - StringBuilder to append code to
+ */
+ private void generateDbRead(Map<String, Integer> columnTypes, String [] colNames,
+ StringBuilder sb) {
+
+ sb.append(" public void readFields(ResultSet __dbResults) throws SQLException {\n");
+
+ int fieldNum = 0;
+
+ for (String col : colNames) {
+ fieldNum++;
+
+ int sqlType = columnTypes.get(col);
+ String javaType = connManager.toJavaType(sqlType);
+ if (null == javaType) {
+ LOG.error("No Java type for SQL type " + sqlType);
+ continue;
+ }
+
+ String getterMethod = dbGetterForType(javaType);
+ if (null == getterMethod) {
+ LOG.error("No db getter method for Java type " + javaType);
+ continue;
+ }
+
+ sb.append(" this." + col + " = JdbcWritableBridge." + getterMethod
+ + "(" + fieldNum + ", __dbResults);\n");
+ }
+
+ sb.append(" }\n");
+ }
+
+
+ /**
+ * Generate the write() method used by the database
+ * @param columnTypes - mapping from column names to sql types
+ * @param colNames - ordered list of column names for table.
+ * @param sb - StringBuilder to append code to
+ */
+ private void generateDbWrite(Map<String, Integer> columnTypes, String [] colNames,
+ StringBuilder sb) {
+
+ sb.append(" public void write(PreparedStatement __dbStmt) throws SQLException {\n");
+
+ int fieldNum = 0;
+
+ for (String col : colNames) {
+ fieldNum++;
+
+ int sqlType = columnTypes.get(col);
+ String javaType = connManager.toJavaType(sqlType);
+ if (null == javaType) {
+ LOG.error("No Java type for SQL type " + sqlType);
+ continue;
+ }
+
+ String setterMethod = dbSetterForType(javaType);
+ if (null == setterMethod) {
+ LOG.error("No db setter method for Java type " + javaType);
+ continue;
+ }
+
+ sb.append(" JdbcWritableBridge." + setterMethod + "(" + col + ", "
+ + fieldNum + ", " + sqlType + ", __dbStmt);\n");
+ }
+
+ sb.append(" }\n");
+ }
+
+
+ /**
+ * Generate the readFields() method used by the Hadoop RPC system
+ * @param columnTypes - mapping from column names to sql types
+ * @param colNames - ordered list of column names for table.
+ * @param sb - StringBuilder to append code to
+ */
+ private void generateHadoopRead(Map<String, Integer> columnTypes, String [] colNames,
+ StringBuilder sb) {
+
+ sb.append(" public void readFields(DataInput __dataIn) throws IOException {\n");
+
+ for (String col : colNames) {
+ int sqlType = columnTypes.get(col);
+ String javaType = connManager.toJavaType(sqlType);
+ if (null == javaType) {
+ LOG.error("No Java type for SQL type " + sqlType);
+ continue;
+ }
+
+ String getterMethod = rpcGetterForMaybeNull(javaType, "__dataIn", col);
+ if (null == getterMethod) {
+ LOG.error("No RPC getter method for Java type " + javaType);
+ continue;
+ }
+
+ sb.append(getterMethod);
+ }
+
+ sb.append(" }\n");
+ }
+
+ /**
+ * Generate the toString() method
+ * @param columnTypes - mapping from column names to sql types
+ * @param colNames - ordered list of column names for table.
+ * @param sb - StringBuilder to append code to
+ */
+ private void generateToString(Map<String, Integer> columnTypes, String [] colNames,
+ StringBuilder sb) {
+
+ sb.append(" public String toString() {\n");
+ sb.append(" StringBuilder sb = new StringBuilder();\n");
+
+ boolean first = true;
+ for (String col : colNames) {
+ int sqlType = columnTypes.get(col);
+ String javaType = connManager.toJavaType(sqlType);
+ if (null == javaType) {
+ LOG.error("No Java type for SQL type " + sqlType);
+ continue;
+ }
+
+ if (!first) {
+ // TODO(aaron): Support arbitrary record delimiters
+ sb.append(" sb.append(\",\");\n");
+ }
+
+ first = false;
+
+ String stringExpr = stringifierForType(javaType, col);
+ if (null == stringExpr) {
+ LOG.error("No toString method for Java type " + javaType);
+ continue;
+ }
+
+ sb.append(" sb.append(" + stringExpr + ");\n");
+
+ }
+
+ sb.append(" return sb.toString();\n");
+ sb.append(" }\n");
+ }
+
+ /**
+ * Generate the write() method used by the Hadoop RPC system
+ * @param columnTypes - mapping from column names to sql types
+ * @param colNames - ordered list of column names for table.
+ * @param sb - StringBuilder to append code to
+ */
+ private void generateHadoopWrite(Map<String, Integer> columnTypes, String [] colNames,
+ StringBuilder sb) {
+
+ sb.append(" public void write(DataOutput __dataOut) throws IOException {\n");
+
+ for (String col : colNames) {
+ int sqlType = columnTypes.get(col);
+ String javaType = connManager.toJavaType(sqlType);
+ if (null == javaType) {
+ LOG.error("No Java type for SQL type " + sqlType);
+ continue;
+ }
+
+ String setterMethod = rpcSetterForMaybeNull(javaType, "__dataOut", col);
+ if (null == setterMethod) {
+ LOG.error("No RPC setter method for Java type " + javaType);
+ continue;
+ }
+
+ sb.append(setterMethod);
+ }
+
+ sb.append(" }\n");
+ }
+ /**
+ * Generate the ORM code for the class.
+ */
+ public void generate() throws IOException {
+ Map<String, Integer> columnTypes = connManager.getColumnTypes(tableName);
+
+ String [] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = connManager.getColumnNames(tableName);
+ }
+
+ // Generate the Java code
+ StringBuilder sb = generateClassForColumns(columnTypes, colNames);
+
+ // Write this out to a file.
+ String codeOutDir = options.getCodeOutputDir();
+
+ // TODO(aaron): Allow package subdirectory (that goes in sourceFilename).
+ String sourceFilename = tableName + ".java";
+ String filename = codeOutDir + sourceFilename;
+
+ LOG.debug("Writing source file: " + filename);
+ LOG.debug("Table name: " + tableName);
+ StringBuilder sbColTypes = new StringBuilder();
+ for (String col : colNames) {
+ Integer colType = columnTypes.get(col);
+ sbColTypes.append(col + ":" + colType + ", ");
+ }
+ String colTypeStr = sbColTypes.toString();
+ LOG.debug("Columns: " + colTypeStr);
+
+ compileManager.addSourceFile(sourceFilename);
+
+ // Create any missing parent directories.
+ File file = new File(filename);
+ String dirname = file.getParent();
+ if (null != dirname) {
+ boolean mkdirSuccess = new File(dirname).mkdirs();
+ if (!mkdirSuccess) {
+ LOG.debug("Could not create directory tree for " + dirname);
+ }
+ }
+
+ OutputStream ostream = null;
+ Writer writer = null;
+ try {
+ ostream = new FileOutputStream(filename);
+ writer = new OutputStreamWriter(ostream);
+ writer.append(sb.toString());
+ } finally {
+ if (null != writer) {
+ try {
+ writer.close();
+ } catch (IOException ioe) {
+ // ignored because we're closing.
+ }
+ }
+
+ if (null != ostream) {
+ try {
+ ostream.close();
+ } catch (IOException ioe) {
+ // ignored because we're closing.
+ }
+ }
+ }
+ }
+
+ /**
+ * Generate the ORM code for a table object containing the named columns
+ * @param columnTypes - mapping from column names to sql types
+ * @param colNames - ordered list of column names for table.
+ * @return - A StringBuilder that contains the text of the class code.
+ */
+ public StringBuilder generateClassForColumns(Map<String, Integer> columnTypes,
+ String [] colNames) {
+ StringBuilder sb = new StringBuilder();
+ // TODO(aaron): Emit package name.
+ sb.append("// ORM class for " + tableName + "\n");
+ sb.append("// WARNING: This class is AUTO-GENERATED. Modify at your own risk.\n");
+
+ sb.append("import org.apache.hadoop.io.Text;\n");
+ sb.append("import org.apache.hadoop.io.Writable;\n");
+ sb.append("import org.apache.hadoop.mapred.lib.db.DBWritable;\n");
+ sb.append("import " + JdbcWritableBridge.class.getCanonicalName() + ";\n");
+ sb.append("import java.sql.PreparedStatement;\n");
+ sb.append("import java.sql.ResultSet;\n");
+ sb.append("import java.sql.SQLException;\n");
+ sb.append("import java.io.DataInput;\n");
+ sb.append("import java.io.DataOutput;\n");
+ sb.append("import java.io.IOException;\n");
+ sb.append("import java.sql.Date;\n");
+ sb.append("import java.sql.Time;\n");
+ sb.append("import java.sql.Timestamp;\n");
+
+ // TODO(aaron): Allow different table/class names.
+ sb.append("public class " + tableName + " implements DBWritable, Writable {\n");
+ sb.append(" public static final int PROTOCOL_VERSION = " + CLASS_WRITER_VERSION + ";\n");
+ generateFields(columnTypes, colNames, sb);
+ generateDbRead(columnTypes, colNames, sb);
+ generateDbWrite(columnTypes, colNames, sb);
+ generateHadoopRead(columnTypes, colNames, sb);
+ generateHadoopWrite(columnTypes, colNames, sb);
+ generateToString(columnTypes, colNames, sb);
+ // TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a WritableComparable
+
+ sb.append("}\n");
+
+ return sb;
+ }
+}
Added: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java Tue May 26 10:29:38 2009
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.orm;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.util.FileListing;
+
+/**
+ * Manages the compilation of a bunch of .java files into .class files
+ * and eventually a jar.
+ *
+ * Also embeds this program's jar into the lib/ directory inside the compiled jar
+ * to ensure that the job runs correctly.
+ *
+ *
+ *
+ */
+public class CompilationManager {
+
+ public static final Log LOG = LogFactory.getLog(CompilationManager.class.getName());
+
+ private ImportOptions options;
+ private List<String> sources;
+
+ public CompilationManager(final ImportOptions opts) {
+ options = opts;
+ sources = new ArrayList<String>();
+ }
+
+ public void addSourceFile(String sourceName) {
+ sources.add(sourceName);
+ }
+
+ /**
+ * locate the hadoop-*-core.jar in $HADOOP_HOME or --hadoop-home.
+ * If that doesn't work, check our classpath.
+ * @return the filename of the hadoop-*-core.jar file.
+ */
+ private String findHadoopCoreJar() {
+ String hadoopHome = options.getHadoopHome();
+
+ if (null == hadoopHome) {
+ LOG.info("$HADOOP_HOME is not set");
+ return findJarForClass(JobConf.class);
+ }
+
+ if (!hadoopHome.endsWith(File.separator)) {
+ hadoopHome = hadoopHome + File.separator;
+ }
+
+ File hadoopHomeFile = new File(hadoopHome);
+ LOG.info("HADOOP_HOME is " + hadoopHomeFile.getAbsolutePath());
+ File [] entries = hadoopHomeFile.listFiles();
+
+ if (null == entries) {
+ LOG.warn("HADOOP_HOME appears empty or missing");
+ return findJarForClass(JobConf.class);
+ }
+
+ for (File f : entries) {
+ if (f.getName().startsWith("hadoop-") && f.getName().endsWith("-core.jar")) {
+ LOG.info("Found hadoop core jar at: " + f.getAbsolutePath());
+ return f.getAbsolutePath();
+ }
+ }
+
+ return findJarForClass(JobConf.class);
+ }
+
+ /**
+ * Compile the .java files into .class files via embedded javac call.
+ */
+ public void compile() throws IOException {
+ List<String> args = new ArrayList<String>();
+
+ // ensure that the jar output dir exists.
+ String jarOutDir = options.getJarOutputDir();
+ boolean mkdirSuccess = new File(jarOutDir).mkdirs();
+ if (!mkdirSuccess) {
+ LOG.debug("Warning: Could not make directories for " + jarOutDir);
+ }
+
+ // find hadoop-*-core.jar for classpath.
+ String coreJar = findHadoopCoreJar();
+ if (null == coreJar) {
+ // Couldn't find a core jar to insert into the CP for compilation.
+ // If, however, we're running this from a unit test, then the path
+ // to the .class files might be set via the hadoop.alt.classpath property
+ // instead. Check there first.
+ String coreClassesPath = System.getProperty("hadoop.alt.classpath");
+ if (null == coreClassesPath) {
+ // no -- we're out of options. Fail.
+ throw new IOException("Could not find hadoop core jar!");
+ } else {
+ coreJar = coreClassesPath;
+ }
+ }
+
+ String curClasspath = System.getProperty("java.class.path");
+
+ args.add("-sourcepath");
+ String srcOutDir = options.getCodeOutputDir();
+ args.add(srcOutDir);
+
+ args.add("-d");
+ args.add(jarOutDir);
+
+ args.add("-classpath");
+ args.add(curClasspath + File.pathSeparator + coreJar);
+
+ // add all the source files
+ for (String srcfile : sources) {
+ args.add(srcOutDir + srcfile);
+ }
+
+ StringBuilder sb = new StringBuilder();
+ for (String arg : args) {
+ sb.append(arg + " ");
+ }
+
+ // NOTE(aaron): Usage is at http://java.sun.com/j2se/1.5.0/docs/tooldocs/solaris/javac.html
+ LOG.info("Invoking javac with args: " + sb.toString());
+ int javacRet = com.sun.tools.javac.Main.compile(args.toArray(new String[0]));
+ if (javacRet != 0) {
+ throw new IOException("javac exited with status " + javacRet);
+ }
+ }
+
+ public String getJarFilename() {
+ String jarOutDir = options.getJarOutputDir();
+ String tableName = options.getTableName();
+ if (null != tableName && tableName.length() > 0) {
+ return jarOutDir + tableName + ".jar";
+ } else if (this.sources.size() == 1) {
+ // if we only have one source file, find it's base name,
+ // turn "foo.java" into "foo", and then return jarDir + "foo" + ".jar"
+ String srcFileName = this.sources.get(0);
+ String basename = new File(srcFileName).getName();
+ String [] parts = basename.split("\\.");
+ String preExtPart = parts[0];
+ return jarOutDir + preExtPart + ".jar";
+ } else {
+ return jarOutDir + "sqoop.jar";
+ }
+ }
+
+ /**
+ * Create an output jar file to use when executing MapReduce jobs
+ */
+ public void jar() throws IOException {
+ String jarOutDir = options.getJarOutputDir();
+ List<File> outDirEntries = FileListing.getFileListing(new File(jarOutDir));
+
+ String jarFilename = getJarFilename();
+
+ LOG.info("Writing jar file: " + jarFilename);
+
+ findThisJar();
+ File jarFileObj = new File(jarFilename);
+ if (jarFileObj.exists()) {
+ if (!jarFileObj.delete()) {
+ LOG.warn("Could not remove existing jar file: " + jarFilename);
+ }
+ }
+
+ FileOutputStream fstream = null;
+ JarOutputStream jstream = null;
+ try {
+ fstream = new FileOutputStream(jarFilename);
+ jstream = new JarOutputStream(fstream);
+
+ // for each input class file, create a zipfile entry for it,
+ // read the file into a buffer, and write it to the jar file.
+
+ for (File entry : outDirEntries) {
+ if (entry.equals(jarFileObj)) {
+ // don't include our own jar!
+ continue;
+ } else if (entry.isDirectory()) {
+ // don't write entries for directories
+ continue;
+ } else {
+ String fileName = entry.getName();
+
+ boolean include = fileName.endsWith(".class")
+ && sources.contains(
+ fileName.substring(0, fileName.length() - ".class".length()) + ".java");
+
+ if (include) {
+ // include this file.
+
+ // chomp off the portion of the full path that is shared
+ // with the base directory where class files were put;
+ // we only record the subdir parts in the zip entry.
+ String fullPath = entry.getAbsolutePath();
+ String chompedPath = fullPath.substring(jarOutDir.length());
+
+ LOG.debug("Got classfile: " + entry.getPath() + " -> " + chompedPath);
+ ZipEntry ze = new ZipEntry(chompedPath);
+ jstream.putNextEntry(ze);
+ copyFileToStream(entry, jstream);
+ jstream.closeEntry();
+ }
+ }
+ }
+
+ // put our own jar in there in its lib/ subdir
+ String thisJarFile = findThisJar();
+ if (null != thisJarFile) {
+ File thisJarFileObj = new File(thisJarFile);
+ String thisJarBasename = thisJarFileObj.getName();
+ String thisJarEntryName = "lib" + File.separator + thisJarBasename;
+ ZipEntry ze = new ZipEntry(thisJarEntryName);
+ jstream.putNextEntry(ze);
+ copyFileToStream(thisJarFileObj, jstream);
+ jstream.closeEntry();
+ } else {
+ // couldn't find our own jar (we were running from .class files?)
+ LOG.warn("Could not find jar for Sqoop; MapReduce jobs may not run correctly.");
+ }
+ } finally {
+ IOUtils.closeStream(jstream);
+ IOUtils.closeStream(fstream);
+ }
+ }
+
+
+ private static final int BUFFER_SZ = 4096;
+
+ /**
+ * utility method to copy a .class file into the jar stream.
+ * @param f
+ * @param ostream
+ * @throws IOException
+ */
+ private void copyFileToStream(File f, OutputStream ostream) throws IOException {
+ FileInputStream fis = new FileInputStream(f);
+ byte [] buffer = new byte[BUFFER_SZ];
+ try {
+ while (true) {
+ int bytesReceived = fis.read(buffer);
+ if (bytesReceived < 1) {
+ break;
+ }
+
+ ostream.write(buffer, 0, bytesReceived);
+ }
+ } finally {
+ fis.close();
+ }
+ }
+
+ private String findThisJar() {
+ return findJarForClass(CompilationManager.class);
+ }
+
+ // method mostly cloned from o.a.h.mapred.JobConf.findContainingJar()
+ private String findJarForClass(Class<? extends Object> classObj) {
+ ClassLoader loader = classObj.getClassLoader();
+ String classFile = classObj.getName().replaceAll("\\.", "/") + ".class";
+ try {
+ for (Enumeration<URL> itr = loader.getResources(classFile);
+ itr.hasMoreElements();) {
+ URL url = (URL) itr.nextElement();
+ if ("jar".equals(url.getProtocol())) {
+ String toReturn = url.getPath();
+ if (toReturn.startsWith("file:")) {
+ toReturn = toReturn.substring("file:".length());
+ }
+ toReturn = URLDecoder.decode(toReturn, "UTF-8");
+ return toReturn.replaceAll("!.*$", "");
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+}
Added: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ClassLoaderStack.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ClassLoaderStack.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ClassLoaderStack.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ClassLoaderStack.java Tue May 26 10:29:38 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Allows you to add and remove jar-files from the running JVM by
+ * instantiating classloaders for them.
+ *
+ *
+ *
+ */
+public final class ClassLoaderStack {
+
+ public static final Log LOG = LogFactory.getLog(ClassLoaderStack.class.getName());
+
+ private ClassLoaderStack() {
+ }
+
+ /**
+ * Sets the classloader for the current thread
+ */
+ public static void setCurrentClassLoader(ClassLoader cl) {
+ LOG.info("Restoring classloader: " + cl.toString());
+ Thread.currentThread().setContextClassLoader(cl);
+ }
+
+ /**
+ * Adds a ClassLoader to the top of the stack that will load from the Jar file
+ * of your choice. Returns the previous classloader so you can restore it
+ * if need be, later.
+ *
+ * @param jarFile The filename of a jar file that you want loaded into this JVM
+ * @param tableClassName The name of the class to load immediately (optional)
+ */
+ public static ClassLoader addJarFile(String jarFile, String testClassName)
+ throws IOException {
+
+ // load the classes from the ORM JAR file into the current VM
+ ClassLoader prevClassLoader = Thread.currentThread().getContextClassLoader();
+ String urlPath = "jar:file://" + new File(jarFile).getAbsolutePath() + "!/";
+ LOG.debug("Attempting to load jar through URL: " + urlPath);
+ LOG.debug("Previous classloader is " + prevClassLoader);
+ URL [] jarUrlArray = {new URL(urlPath)};
+ URLClassLoader cl = URLClassLoader.newInstance(jarUrlArray, prevClassLoader);
+ try {
+ if (null != testClassName) {
+ // try to load a class from the jar to force loading now.
+ Class.forName(testClassName, true, cl);
+ }
+ LOG.info("Loaded jar into current JVM: " + urlPath);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Could not load jar " + jarFile + " into JVM. (Could not find class "
+ + testClassName + ".)", cnfe);
+ }
+
+ LOG.info("Added classloader for jar " + jarFile + ": " + cl);
+ Thread.currentThread().setContextClassLoader(cl);
+ return prevClassLoader;
+ }
+}
Added: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java Tue May 26 10:29:38 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.io.File;
+import java.io.FileNotFoundException;
+
+/**
+* Recursive file listing under a specified directory.
+*
+* Taken from http://www.javapractices.com/topic/TopicAction.do?Id=68
+* Used under the terms of the CC Attribution license:
+* http://creativecommons.org/licenses/by/3.0/
+*
+* Method by Alex Wong (javapractices.com)
+*/
+public final class FileListing {
+
+ private FileListing() { }
+
+ /**
+ * Demonstrate use.
+ *
+ * @param aArgs - <tt>aArgs[0]</tt> is the full name of an existing
+ * directory that can be read.
+ */
+ public static void main(String... aArgs) throws FileNotFoundException {
+ File startingDirectory = new File(aArgs[0]);
+ List<File> files = FileListing.getFileListing(startingDirectory);
+
+ //print out all file names, in the the order of File.compareTo()
+ for (File file : files) {
+ System.out.println(file);
+ }
+ }
+
+ /**
+ * Recursively walk a directory tree and return a List of all
+ * Files found; the List is sorted using File.compareTo().
+ *
+ * @param aStartingDir is a valid directory, which can be read.
+ */
+ public static List<File> getFileListing(File aStartingDir) throws FileNotFoundException {
+ validateDirectory(aStartingDir);
+ List<File> result = getFileListingNoSort(aStartingDir);
+ Collections.sort(result);
+ return result;
+ }
+
+ // PRIVATE //
+ private static List<File> getFileListingNoSort(File aStartingDir) throws FileNotFoundException {
+ List<File> result = new ArrayList<File>();
+ File[] filesAndDirs = aStartingDir.listFiles();
+ List<File> filesDirs = Arrays.asList(filesAndDirs);
+ for (File file : filesDirs) {
+ result.add(file); //always add, even if directory
+ if (!file.isFile()) {
+ //must be a directory
+ //recursive call!
+ List<File> deeperList = getFileListingNoSort(file);
+ result.addAll(deeperList);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Directory is valid if it exists, does not represent a file, and can be read.
+ */
+ private static void validateDirectory(File aDirectory) throws FileNotFoundException {
+ if (aDirectory == null) {
+ throw new IllegalArgumentException("Directory should not be null.");
+ }
+ if (!aDirectory.exists()) {
+ throw new FileNotFoundException("Directory does not exist: " + aDirectory);
+ }
+ if (!aDirectory.isDirectory()) {
+ throw new IllegalArgumentException("Is not a directory: " + aDirectory);
+ }
+ if (!aDirectory.canRead()) {
+ throw new IllegalArgumentException("Directory cannot be read: " + aDirectory);
+ }
+ }
+}
Added: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ImportError.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ImportError.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ImportError.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ImportError.java Tue May 26 10:29:38 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+/**
+ * General error during import process.
+ *
+ *
+ */
+@SuppressWarnings("serial")
+public class ImportError extends Exception {
+
+ public ImportError() {
+ super("ImportError");
+ }
+
+ public ImportError(final String message) {
+ super(message);
+ }
+
+ public ImportError(final Throwable cause) {
+ super(cause);
+ }
+
+ public ImportError(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
Added: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ResultSetPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ResultSetPrinter.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ResultSetPrinter.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ResultSetPrinter.java Tue May 26 10:29:38 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility methods to format and print ResultSet objects
+ *
+ *
+ */
+public class ResultSetPrinter {
+
+ public static final Log LOG = LogFactory.getLog(ResultSetPrinter.class.getName());
+
+ // max output width to allocate to any column of the printed results.
+ private static final int MAX_COL_WIDTH = 20;
+
+ // length of the byte buffer, in bytes, to allocate.
+ private static final int BUFFER_LEN = 4096;
+
+ // maximum number of characters to deserialize from the stringbuilder
+ // into the byte buffer at a time. Factor of 2 off b/c of Unicode.
+ private static final int MAX_CHARS = 2048;
+
+ private ByteBuffer bytebuf;
+ private char [] charArray;
+
+ public ResultSetPrinter() {
+ bytebuf = ByteBuffer.allocate(BUFFER_LEN);
+ charArray = new char[MAX_CHARS];
+ }
+
+ /**
+ * Print 'str' to the string builder, padded to 'width' chars
+ */
+ private static void printPadded(StringBuilder sb, String str, int width) {
+ int numPad;
+ if (null == str) {
+ sb.append("(null)");
+ numPad = width - "(null)".length();
+ } else {
+ sb.append(str);
+ numPad = width - str.length();
+ }
+
+ for (int i = 0; i < numPad; i++) {
+ sb.append(' ');
+ }
+ }
+
+
+ /**
+ * Takes the contents of the StringBuilder and prints it on the OutputStream
+ */
+ private void sendToStream(StringBuilder sb, OutputStream os) throws IOException {
+
+ int pos = 0; // current pos in the string builder
+ int len = sb.length(); // total length (in characters) to send to os.
+ CharBuffer charbuf = bytebuf.asCharBuffer();
+
+ while (pos < len) {
+ int copyLen = Math.min(sb.length(), MAX_CHARS);
+ sb.getChars(pos, copyLen, charArray, 0);
+
+ charbuf.put(charArray, 0, copyLen);
+ os.write(bytebuf.array());
+
+ pos += copyLen;
+ }
+
+ }
+
+ private static final String COL_SEPARATOR = " | ";
+
+ /**
+ * Format the contents of the ResultSet into something that could be printed
+ * neatly; the results are appended to the supplied StringBuilder.
+ */
+ public final void printResultSet(OutputStream os, ResultSet results) throws IOException {
+ try {
+ StringBuilder sbNames = new StringBuilder();
+ int cols = results.getMetaData().getColumnCount();
+
+ int [] colWidths = new int[cols];
+ ResultSetMetaData metadata = results.getMetaData();
+ for (int i = 1; i < cols + 1; i++) {
+ String colName = metadata.getColumnName(i);
+ colWidths[i - 1] = Math.min(metadata.getColumnDisplaySize(i), MAX_COL_WIDTH);
+ if (colName == null || colName.equals("")) {
+ colName = metadata.getColumnLabel(i) + "*";
+ }
+ printPadded(sbNames, colName, colWidths[i - 1]);
+ sbNames.append(COL_SEPARATOR);
+ }
+ sbNames.append('\n');
+
+ StringBuilder sbPad = new StringBuilder();
+ for (int i = 0; i < cols; i++) {
+ for (int j = 0; j < COL_SEPARATOR.length() + colWidths[i]; j++) {
+ sbPad.append('-');
+ }
+ }
+ sbPad.append('\n');
+
+ sendToStream(sbPad, os);
+ sendToStream(sbNames, os);
+ sendToStream(sbPad, os);
+
+ while (results.next()) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 1; i < cols + 1; i++) {
+ printPadded(sb, results.getString(i), colWidths[i - 1]);
+ sb.append(COL_SEPARATOR);
+ }
+ sb.append('\n');
+ sendToStream(sb, os);
+ }
+
+ sendToStream(sbPad, os);
+ } catch (SQLException sqlException) {
+ LOG.error("Error reading from database: " + sqlException.toString());
+ }
+ }
+
+}
+
Added: hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java Tue May 26 10:29:38 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop;
+
+import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
+import org.apache.hadoop.sqoop.manager.TestSqlManager;
+import org.apache.hadoop.sqoop.orm.TestClassWriter;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+/**
+ * All tests for Sqoop (org.apache.hadoop.sqoop)
+ *
+ *
+ */
+public final class AllTests {
+
+ private AllTests() { }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop");
+
+ suite.addTestSuite(TestAllTables.class);
+ suite.addTestSuite(TestHsqldbManager.class);
+ suite.addTestSuite(TestSqlManager.class);
+ suite.addTestSuite(TestClassWriter.class);
+ suite.addTestSuite(TestColumnTypes.class);
+ suite.addTestSuite(TestMultiCols.class);
+ suite.addTestSuite(TestOrderBy.class);
+
+ return suite;
+ }
+
+}
+
Added: hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java Tue May 26 10:29:38 2009
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Before;
+
+import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+
+/**
+ * Test the --all-tables functionality that can import multiple tables.
+ * ;
+ *
+ *
+ */
+public class TestAllTables extends ImportJobTestCase {
+
+ /**
+ * Create the argv to pass to Sqoop
+ * @return the argv as an array of strings.
+ */
+ private String [] getArgv(boolean includeHadoopFlags) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ if (includeHadoopFlags) {
+ args.add("-D");
+ args.add("mapred.job.tracker=local");
+ args.add("-D");
+ args.add("mapred.map.tasks=1");
+ args.add("-D");
+ args.add("fs.default.name=file:///");
+ }
+
+ args.add("--all-tables");
+ args.add("--warehouse-dir");
+ args.add(getWarehouseDir());
+ args.add("--connect");
+ args.add(HsqldbTestServer.getUrl());
+
+ return args.toArray(new String[0]);
+ }
+
+ /** the names of the tables we're creating. */
+ private List<String> tableNames;
+
+ /** The strings to inject in the (ordered) tables */
+ private List<String> expectedStrings;
+
+ @Before
+ public void setUp() {
+ // start the server
+ super.setUp();
+
+ // throw away TWOINTTABLE and things we don't care about.
+ try {
+ this.getTestServer().dropExistingSchema();
+ } catch (SQLException sqlE) {
+ fail(sqlE.toString());
+ }
+
+ this.tableNames = new ArrayList<String>();
+ this.expectedStrings = new ArrayList<String>();
+
+ // create two tables.
+ this.expectedStrings.add("A winner");
+ this.expectedStrings.add("is you!");
+
+ for (String expectedStr: this.expectedStrings) {
+ this.createTableForColType("VARCHAR(32) PRIMARY KEY", "'" + expectedStr + "'");
+ this.tableNames.add(this.getTableName());
+ this.removeTableDir();
+ incrementTableNum();
+ }
+ }
+
+ public void testMultiTableImport() throws IOException {
+ String [] argv = getArgv(true);
+ runImport(argv);
+
+ Path warehousePath = new Path(this.getWarehouseDir());
+ for (String tableName : this.tableNames) {
+ Path tablePath = new Path(warehousePath, tableName);
+ Path filePath = new Path(tablePath, "part-00000");
+
+ // dequeue the expected value for this table. This
+ // list has the same order as the tableNames list.
+ String expectedVal = this.expectedStrings.get(0);
+ this.expectedStrings.remove(0);
+
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(new FileInputStream(new File(filePath.toString()))));
+ try {
+ String line = reader.readLine();
+ assertEquals("Table " + tableName + " expected a different string",
+ expectedVal, line);
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+ }
+}
Added: hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java Tue May 26 10:29:38 2009
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+
+/**
+ * Test that each of the different SQL Column types that we support
+ * can, in fact, be imported into HDFS. Test that the writable
+ * that we expect to work, does.
+ *
+ * This requires testing:
+ * - That we can pull from the database into HDFS:
+ * readFields(ResultSet), toString()
+ * - That we can pull from mapper to reducer:
+ * write(DataOutput), readFields(DataInput)
+ * - And optionally, that we can push to the database:
+ * write(PreparedStatement)
+ *
+ *
+ *
+ */
+public class TestColumnTypes extends ImportJobTestCase {
+
+ public static final Log LOG = LogFactory.getLog(TestColumnTypes.class.getName());
+
+ /**
+ * Do a full verification test on the singleton value of a given type.
+ * @param colType The SQL type to instantiate the column
+ * @param insertVal The SQL text to insert a value into the database
+ * @param returnVal The string representation of the value as extracted from the db
+ */
+ private void verifyType(String colType, String insertVal, String returnVal) {
+ verifyType(colType, insertVal, returnVal, returnVal);
+ }
+
+ /**
+ * Do a full verification test on the singleton value of a given type.
+ * @param colType The SQL type to instantiate the column
+ * @param insertVal The SQL text to insert a value into the database
+ * @param returnVal The string representation of the value as extracted from the db
+ * @param seqFileVal The string representation of the value as extracted through
+ * the DBInputFormat, serialized, and injected into a SequenceFile and put
+ * through toString(). This may be slightly different than what ResultSet.getString()
+ * returns, which is used by returnVal.
+ */
+ private void verifyType(String colType, String insertVal, String returnVal, String seqFileVal) {
+ createTableForColType(colType, insertVal);
+ verifyReadback(1, returnVal);
+ verifyImport(seqFileVal, null);
+ }
+
+ static final String STRING_VAL_IN = "'this is a short string'";
+ static final String STRING_VAL_OUT = "this is a short string";
+
+ @Test
+ public void testStringCol1() {
+ verifyType("VARCHAR(32)", STRING_VAL_IN, STRING_VAL_OUT);
+ }
+
+ @Test
+ public void testStringCol2() {
+ verifyType("CHAR(32)", STRING_VAL_IN, STRING_VAL_OUT);
+ }
+
+ @Test
+ public void testEmptyStringCol() {
+ verifyType("VARCHAR(32)", "''", "");
+ }
+
+ @Test
+ public void testNullStringCol() {
+ verifyType("VARCHAR(32)", "NULL", null);
+ }
+
+ @Test
+ public void testInt() {
+ verifyType("INTEGER", "42", "42");
+ }
+
+ @Test
+ public void testNullInt() {
+ verifyType("INTEGER", "NULL", null);
+ }
+
+ @Test
+ public void testBit1() {
+ verifyType("BIT", "1", "true");
+ }
+
+ @Test
+ public void testBit2() {
+ verifyType("BIT", "0", "false");
+ }
+
+ @Test
+ public void testBit3() {
+ verifyType("BIT", "false", "false");
+ }
+
+ @Test
+ public void testTinyInt1() {
+ verifyType("TINYINT", "0", "0");
+ }
+
+ @Test
+ public void testTinyInt2() {
+ verifyType("TINYINT", "42", "42");
+ }
+
+ @Test
+ public void testSmallInt1() {
+ verifyType("SMALLINT", "-1024", "-1024");
+ }
+
+ @Test
+ public void testSmallInt2() {
+ verifyType("SMALLINT", "2048", "2048");
+ }
+
+ @Test
+ public void testBigInt1() {
+ verifyType("BIGINT", "10000000000", "10000000000");
+ }
+
+ @Test
+ public void testReal1() {
+ verifyType("REAL", "256", "256.0");
+ }
+
+ @Test
+ public void testReal2() {
+ verifyType("REAL", "256.45", "256.45");
+ }
+
+ @Test
+ public void testFloat1() {
+ verifyType("FLOAT", "256", "256.0");
+ }
+
+ @Test
+ public void testFloat2() {
+ verifyType("FLOAT", "256.45", "256.45");
+ }
+
+ @Test
+ public void testDouble1() {
+ verifyType("DOUBLE", "-256", "-256.0");
+ }
+
+ @Test
+ public void testDouble2() {
+ verifyType("DOUBLE", "256.45", "256.45");
+ }
+
+ @Test
+ public void testDate1() {
+ verifyType("DATE", "'2009-1-12'", "2009-01-12");
+ }
+
+ @Test
+ public void testDate2() {
+ verifyType("DATE", "'2009-01-12'", "2009-01-12");
+ }
+
+ @Test
+ public void testDate3() {
+ verifyType("DATE", "'2009-04-24'", "2009-04-24");
+ }
+
+ @Test
+ public void testTime1() {
+ verifyType("TIME", "'12:24:00'", "12:24:00");
+ }
+
+ @Test
+ public void testTime2() {
+ verifyType("TIME", "'06:24:00'", "06:24:00");
+ }
+
+ @Test
+ public void testTime3() {
+ verifyType("TIME", "'6:24:00'", "06:24:00");
+ }
+
+ @Test
+ public void testTime4() {
+ verifyType("TIME", "'18:24:00'", "18:24:00");
+ }
+
+ @Test
+ public void testTimestamp1() {
+ verifyType("TIMESTAMP", "'2009-04-24 18:24:00'",
+ "2009-04-24 18:24:00.000000000",
+ "2009-04-24 18:24:00.0");
+ }
+
+ @Test
+ public void testTimestamp2() {
+ verifyType("TIMESTAMP", "'2009-04-24 18:24:00.0002'",
+ "2009-04-24 18:24:00.000200000",
+ "2009-04-24 18:24:00.0002");
+ }
+
+ @Test
+ public void testTimestamp3() {
+ verifyType("TIMESTAMP", "null", null);
+ }
+
+ @Test
+ public void testNumeric1() {
+ verifyType("NUMERIC", "1", "1");
+ }
+
+ @Test
+ public void testNumeric2() {
+ verifyType("NUMERIC", "-10", "-10");
+ }
+
+ @Test
+ public void testNumeric3() {
+ verifyType("NUMERIC", "3.14159", "3.14159");
+ }
+
+ @Test
+ public void testNumeric4() {
+ verifyType("NUMERIC", "30000000000000000000000000.14159", "30000000000000000000000000.14159");
+ }
+
+ @Test
+ public void testNumeric5() {
+ verifyType("NUMERIC", "999999999999999999999999999999.14159", "999999999999999999999999999999.14159");
+ }
+
+ @Test
+ public void testNumeric6() {
+ verifyType("NUMERIC", "-999999999999999999999999999999.14159", "-999999999999999999999999999999.14159");
+ }
+
+ @Test
+ public void testDecimal1() {
+ verifyType("DECIMAL", "1", "1");
+ }
+
+ @Test
+ public void testDecimal2() {
+ verifyType("DECIMAL", "-10", "-10");
+ }
+
+ @Test
+ public void testDecimal3() {
+ verifyType("DECIMAL", "3.14159", "3.14159");
+ }
+
+ @Test
+ public void testDecimal4() {
+ verifyType("DECIMAL", "30000000000000000000000000.14159", "30000000000000000000000000.14159");
+ }
+
+ @Test
+ public void testDecimal5() {
+ verifyType("DECIMAL", "999999999999999999999999999999.14159", "999999999999999999999999999999.14159");
+ }
+
+ @Test
+ public void testDecimal6() {
+ verifyType("DECIMAL", "-999999999999999999999999999999.14159", "-999999999999999999999999999999.14159");
+ }
+
+ @Test
+ public void testLongVarChar() {
+ verifyType("LONGVARCHAR", "'this is a long varchar'", "this is a long varchar");
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiCols.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiCols.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiCols.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiCols.java Tue May 26 10:29:38 2009
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+
+/**
+ * Test cases that import rows containing multiple columns,
+ * some of which may contain null values.
+ *
+ * Also test loading only selected columns from the db.
+ *
+ *
+ */
+public class TestMultiCols extends ImportJobTestCase {
+
+ public static final Log LOG = LogFactory.getLog(TestMultiCols.class.getName());
+
+ /**
+ * Do a full import verification test on a table containing one row
+ * @param types the types of the columns to insert
+ * @param insertVals the SQL text to use to insert each value
+ * @param validateVals the text to expect when retrieving each value from
+ * the db
+ * @param validateLine the text to expect as a toString() of the entire row,
+ * as imported by the tool
+ * @param importColumns The list of columns to import
+ */
+ private void verifyTypes(String [] types , String [] insertVals,
+ String validateVals [], String validateLine) {
+ verifyTypes(types, insertVals, validateVals, validateLine, null);
+ }
+
+ private void verifyTypes(String [] types , String [] insertVals,
+ String validateVals [], String validateLine, String [] importColumns) {
+
+ createTableWithColTypes(types, insertVals);
+
+ int i = 0;
+ for (String val : validateVals) {
+ verifyReadback(++i, val);
+ LOG.debug("Verified column " + i + " as value: " + val);
+ }
+
+ verifyImport(validateLine, importColumns);
+ LOG.debug("Verified input line as " + validateLine + " -- ok!");
+ }
+
+ public void testThreeStrings() {
+ String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+ String [] insertVals = { "'foo'", "'bar'", "'baz'" };
+ String [] validateVals = { "foo", "bar", "baz" };
+ String validateLine = "foo,bar,baz";
+
+ verifyTypes(types, insertVals, validateVals, validateLine);
+ }
+
+ public void testStringsWithNull1() {
+ String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+ String [] insertVals = { "'foo'", "null", "'baz'" };
+ String [] validateVals = { "foo", null, "baz" };
+ String validateLine = "foo,null,baz";
+
+ verifyTypes(types, insertVals, validateVals, validateLine);
+ }
+
+ public void testStringsWithNull2() {
+ String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+ String [] insertVals = { "null", "'foo'", "'baz'" };
+ String [] validateVals = { null, "foo", "baz" };
+ String validateLine = "null,foo,baz";
+
+ verifyTypes(types, insertVals, validateVals, validateLine);
+ }
+
+ public void testStringsWithNull3() {
+ String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
+ String [] insertVals = { "'foo'", "'baz'", "null"};
+ String [] validateVals = { "foo", "baz", null };
+ String validateLine = "foo,baz,null";
+
+ verifyTypes(types, insertVals, validateVals, validateLine);
+ }
+
+ public void testThreeInts() {
+ String [] types = { "INTEGER", "INTEGER", "INTEGER" };
+ String [] insertVals = { "1", "2", "3" };
+ String [] validateVals = { "1", "2", "3" };
+ String validateLine = "1,2,3";
+
+ verifyTypes(types, insertVals, validateVals, validateLine);
+ }
+
+ public void testIntsWithNulls() {
+ String [] types = { "INTEGER", "INTEGER", "INTEGER" };
+ String [] insertVals = { "1", "null", "3" };
+ String [] validateVals = { "1", null, "3" };
+ String validateLine = "1,null,3";
+
+ verifyTypes(types, insertVals, validateVals, validateLine);
+ }
+
+ public void testMixed1() {
+ String [] types = { "INTEGER", "VARCHAR(32)", "DATE" };
+ String [] insertVals = { "1", "'meep'", "'2009-12-31'" };
+ String [] validateVals = { "1", "meep", "2009-12-31" };
+ String validateLine = "1,meep,2009-12-31";
+
+ verifyTypes(types, insertVals, validateVals, validateLine);
+ }
+
+ public void testMixed2() {
+ String [] types = { "INTEGER", "VARCHAR(32)", "DATE" };
+ String [] insertVals = { "null", "'meep'", "'2009-12-31'" };
+ String [] validateVals = { null, "meep", "2009-12-31" };
+ String validateLine = "null,meep,2009-12-31";
+
+ verifyTypes(types, insertVals, validateVals, validateLine);
+ }
+
+ public void testMixed3() {
+ String [] types = { "INTEGER", "VARCHAR(32)", "DATE" };
+ String [] insertVals = { "1", "'meep'", "null" };
+ String [] validateVals = { "1", "meep", null };
+ String validateLine = "1,meep,null";
+
+ verifyTypes(types, insertVals, validateVals, validateLine);
+ }
+
+ public void testMixed4() {
+ String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+ String [] insertVals = { "-42", "17", "33333333333333333333333.1714" };
+ String [] validateVals = { "-42", "17", "33333333333333333333333.1714" };
+ String validateLine = "-42,17,33333333333333333333333.1714";
+
+ verifyTypes(types, insertVals, validateVals, validateLine);
+ }
+
+ public void testMixed5() {
+ String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+ String [] insertVals = { "null", "17", "33333333333333333333333.0" };
+ String [] validateVals = { null, "17", "33333333333333333333333.0" };
+ String validateLine = "null,17,33333333333333333333333.0";
+
+ verifyTypes(types, insertVals, validateVals, validateLine);
+ }
+
+ public void testMixed6() {
+ String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+ String [] insertVals = { "33333333333333333333333", "17", "-42"};
+ String [] validateVals = { "33333333333333333333333", "17", "-42" };
+ String validateLine = "33333333333333333333333,17,-42";
+
+ verifyTypes(types, insertVals, validateVals, validateLine);
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ // the tests below here test the --columns parameter and ensure that
+ // we can selectively import only certain columns.
+ //////////////////////////////////////////////////////////////////////////
+
+ public void testSkipFirstCol() {
+ String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+ String [] insertVals = { "33333333333333333333333", "17", "-42"};
+ String [] validateVals = { "33333333333333333333333", "17", "-42" };
+ String validateLine = "17,-42";
+
+ String [] loadCols = {"DATA_COL1", "DATA_COL2"};
+
+ verifyTypes(types, insertVals, validateVals, validateLine, loadCols);
+ }
+
+ public void testSkipSecondCol() {
+ String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+ String [] insertVals = { "33333333333333333333333", "17", "-42"};
+ String [] validateVals = { "33333333333333333333333", "17", "-42" };
+ String validateLine = "33333333333333333333333,-42";
+
+ String [] loadCols = {"DATA_COL0", "DATA_COL2"};
+
+ verifyTypes(types, insertVals, validateVals, validateLine, loadCols);
+ }
+
+ public void testSkipThirdCol() {
+ String [] types = { "NUMERIC", "INTEGER", "NUMERIC" };
+ String [] insertVals = { "33333333333333333333333", "17", "-42"};
+ String [] validateVals = { "33333333333333333333333", "17", "-42" };
+ String validateLine = "33333333333333333333333,17";
+
+ String [] loadCols = {"DATA_COL0", "DATA_COL1"};
+
+ verifyTypes(types, insertVals, validateVals, validateLine, loadCols);
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestOrderBy.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestOrderBy.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestOrderBy.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestOrderBy.java Tue May 26 10:29:38 2009
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException;
+import org.apache.hadoop.sqoop.orm.CompilationManager;
+import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+import org.apache.hadoop.sqoop.testutil.SeqFileReader;
+import org.apache.hadoop.sqoop.util.ClassLoaderStack;
+
+/**
+ * Test that --order-by works
+ *
+ *
+ */
+public class TestOrderBy extends ImportJobTestCase {
+
+ /**
+ * Create the argv to pass to Sqoop
+ * @return the argv as an array of strings.
+ */
+ private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String orderByCol) {
+ String columnsString = "";
+ for (String col : colNames) {
+ columnsString += col + ",";
+ }
+
+ ArrayList<String> args = new ArrayList<String>();
+
+ if (includeHadoopFlags) {
+ args.add("-D");
+ args.add("mapred.job.tracker=local");
+ args.add("-D");
+ args.add("mapred.map.tasks=1");
+ args.add("-D");
+ args.add("fs.default.name=file:///");
+ }
+
+ args.add("--table");
+ args.add(HsqldbTestServer.getTableName());
+ args.add("--columns");
+ args.add(columnsString);
+ args.add("--order-by");
+ args.add(orderByCol);
+ args.add("--warehouse-dir");
+ args.add(getWarehouseDir());
+ args.add("--connect");
+ args.add(HsqldbTestServer.getUrl());
+ args.add("--as-sequencefile");
+
+ return args.toArray(new String[0]);
+ }
+
+ // this test just uses the two int table.
+ protected String getTableName() {
+ return HsqldbTestServer.getTableName();
+ }
+
+
+ /**
+ * Given a comma-delimited list of integers, grab and parse the first int
+ * @param str a comma-delimited list of values, the first of which is an int.
+ * @return the first field in the string, cast to int
+ */
+ private int getFirstInt(String str) {
+ String [] parts = str.split(",");
+ return Integer.parseInt(parts[0]);
+ }
+
+ public void runOrderByTest(String orderByCol, String firstValStr, int expectedSum)
+ throws IOException {
+
+ String [] columns = HsqldbTestServer.getFieldNames();
+ ClassLoader prevClassLoader = null;
+ SequenceFile.Reader reader = null;
+
+ String [] argv = getArgv(true, columns, orderByCol);
+ runImport(argv);
+ try {
+ ImportOptions opts = new ImportOptions();
+ opts.parse(getArgv(false, columns, orderByCol));
+
+ CompilationManager compileMgr = new CompilationManager(opts);
+ String jarFileName = compileMgr.getJarFilename();
+
+ prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, getTableName());
+
+ reader = SeqFileReader.getSeqFileReader(getDataFilePath().toString());
+
+ // here we can actually instantiate (k, v) pairs.
+ Configuration conf = new Configuration();
+ Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ if (reader.next(key) == null) {
+ fail("Empty SequenceFile during import");
+ }
+
+ // make sure that the value we think should be at the top, is.
+ reader.getCurrentValue(val);
+ assertEquals("Invalid ordering within sorted SeqFile", firstValStr, val.toString());
+
+ // We know that these values are two ints separated by a ',' character.
+ // Since this is all dynamic, though, we don't want to actually link against
+ // the class and use its methods. So we just parse this back into int fields manually.
+ // Sum them up and ensure that we get the expected total for the first column, to
+ // verify that we got all the results from the db into the file.
+ int curSum = getFirstInt(val.toString());
+
+ // now sum up everything else in the file.
+ while (reader.next(key) != null) {
+ reader.getCurrentValue(val);
+ curSum += getFirstInt(val.toString());
+ }
+
+ assertEquals("Total sum of first db column mismatch", expectedSum, curSum);
+ } catch (InvalidOptionsException ioe) {
+ fail(ioe.toString());
+ } finally {
+ IOUtils.closeStream(reader);
+
+ if (null != prevClassLoader) {
+ ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
+ }
+ }
+ }
+
+ public void testOrderByFirstCol() throws IOException {
+ String orderByCol = "INTFIELD1";
+ runOrderByTest(orderByCol, "1,8", HsqldbTestServer.getFirstColSum());
+ }
+
+ public void testOrderBySecondCol() throws IOException {
+ String orderByCol = "INTFIELD2";
+ runOrderByTest(orderByCol, "7,2", HsqldbTestServer.getFirstColSum());
+ }
+}
Added: hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestHsqldbManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestHsqldbManager.java?rev=778646&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestHsqldbManager.java (added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/TestHsqldbManager.java Tue May 26 10:29:38 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.manager;
+
+import java.sql.SQLException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
+
+/**
+ * Test HsqldbManager-specific functionality that overrides SqlManager behavior
+ *
+ *
+ */
+public class TestHsqldbManager extends TestCase {
+
+ public static final Log LOG = LogFactory.getLog(TestHsqldbManager.class.getName());
+
+ // instance variables populated during setUp, used during tests
+ private HsqldbTestServer testServer;
+ private ConnManager manager;
+
+ @Before
+ public void setUp() {
+ testServer = new HsqldbTestServer();
+ try {
+ testServer.resetServer();
+ } catch (SQLException sqlE) {
+ LOG.error("Got SQLException: " + sqlE.toString());
+ fail("Got SQLException: " + sqlE.toString());
+ } catch (ClassNotFoundException cnfe) {
+ LOG.error("Could not find class for db driver: " + cnfe.toString());
+ fail("Could not find class for db driver: " + cnfe.toString());
+ }
+
+ manager = testServer.getManager();
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ manager.close();
+ } catch (SQLException sqlE) {
+ LOG.error("Got SQLException: " + sqlE.toString());
+ fail("Got SQLException: " + sqlE.toString());
+ }
+ }
+
+ // note: hsql returns only the "PUBLIC" schema name; not individual user db names.
+ @Test
+ public void testListDatabases() {
+ String [] databases = manager.listDatabases();
+
+ assertNotNull("manager returned no database list", databases);
+ assertEquals("Database list should be length 1", 1, databases.length);
+ assertEquals(HsqldbTestServer.getSchemaName(), databases[0]);
+ }
+
+}
+