You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/03/05 07:23:19 UTC
[2/2] git commit: SQOOP-846: Provide a connector for Netezza
appliances
Updated Branches:
refs/heads/trunk 34bdf07bc -> 0d5f73ad8
SQOOP-846: Provide a connector for Netezza appliances
(Venkat Ranganathan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/0d5f73ad
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/0d5f73ad
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/0d5f73ad
Branch: refs/heads/trunk
Commit: 0d5f73ad8dccd4630e4762361513b1583b2b1a2f
Parents: 34bdf07
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Mar 4 22:22:46 2013 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Mar 4 22:22:46 2013 -0800
----------------------------------------------------------------------
src/docs/user/connectors.txt | 67 ++++
src/java/org/apache/sqoop/lib/DelimiterSet.java | 11 +
.../sqoop/manager/DefaultManagerFactory.java | 6 +
.../apache/sqoop/manager/DirectNetezzaManager.java | 249 +++++++++++++
src/java/org/apache/sqoop/manager/MySQLUtils.java | 14 +-
.../org/apache/sqoop/manager/NetezzaManager.java | 222 +++++++++++
.../db/netezza/NetezzaDBDataSliceSplitter.java | 60 +++
.../netezza/NetezzaExternalTableExportMapper.java | 221 +++++++++++
.../netezza/NetezzaExternalTableImportMapper.java | 224 +++++++++++
.../NetezzaExternalTableRecordExportMapper.java | 38 ++
.../NetezzaExternalTableTextExportMapper.java | 38 ++
.../db/netezza/NetezzaJDBCStatementRunner.java | 95 +++++
.../netezza/NetezzaDataDrivenDBInputFormat.java | 69 ++++
.../netezza/NetezzaExternalTableExportJob.java | 117 ++++++
.../netezza/NetezzaExternalTableImportJob.java | 123 ++++++
.../netezza/NetezzaExternalTableInputFormat.java | 112 ++++++
.../netezza/NetezzaExternalTableInputSplit.java | 74 ++++
.../manager/DirectNetezzaExportManualTest.java | 286 +++++++++++++++
.../sqoop/manager/NetezzaImportManualTest.java | 225 ++++++++++++
.../cloudera/sqoop/manager/NetezzaTestUtils.java | 93 +++++
20 files changed, 2338 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/docs/user/connectors.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt
index 7dd2a2e..c172c4b 100644
--- a/src/docs/user/connectors.txt
+++ b/src/docs/user/connectors.txt
@@ -243,3 +243,70 @@ or map tasks fail.
When reduce task fails,
staging table for the task are left for manual retry and
users must take care of it.
+
+Netezza Connector
+~~~~~~~~~~~~~~~~~
+
+Extra arguments
+^^^^^^^^^^^^^^^
+
+List of all extra arguments supported by Netezza Connector is shown below:
+
+.Supported Netezza extra arguments:
+[grid="all"]
+`-------------------------------------`----------------------------------------
+Argument Description
+-------------------------------------------------------------------------------
++--partitioned-access+ Whether each mapper acts on a subset\
+ of data slices of a table or all\
+ Default is "false" for standard mode\
+ and "true" for direct mode.
++--max-errors+ Applicable only in direct mode.\
+ This option specifies the error threshold\
+ per mapper while transferring data. If\
+ the number of errors encountered exceed\
+ this threshold then the job will fail.
+ Default value is 1.
++--log-dir+ Applicable only in direct mode.\
+ Specifies the directory where Netezza\
+ external table operation logs are stored.\
+ Default value is /tmp.
+--------------------------------------------------------------------------------
+
+
+Direct Mode
+^^^^^^^^^^^
+Netezza connector supports an optimized data transfer facility using the
+Netezza external tables feature. Each map tasks of Netezza connector's import
+job will work on a subset of the Netezza partitions and transparently create
+and use an external table to transport data. Similarly, export jobs will use
+the external table to push data fast onto the NZ system. Direct mode does
+not support staging tables, upsert options etc.
+
+Here is an example of complete command line for import using the Netezza
+external table feature.
+
+----
+$ sqoop import \
+ --direct \
+ --connect jdbc:netezza://nzhost:5480/sqoop \
+ --table nztable \
+ --username nzuser \
+ --password nzpass \
+ --target-dir hdfsdir
+
+----
+
+Here is an example of complete command line for export with tab as the field
+terminator character.
+
+----
+$ sqoop export \
+ --direct \
+ --connect jdbc:netezza://nzhost:5480/sqoop \
+ --table nztable \
+ --username nzuser \
+ --password nzpass \
+ --export-dir hdfsdir \
+ --input-fields-terminated-by "\t"
+----
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/lib/DelimiterSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/lib/DelimiterSet.java b/src/java/org/apache/sqoop/lib/DelimiterSet.java
index 4e9bcab..ef62ba0 100644
--- a/src/java/org/apache/sqoop/lib/DelimiterSet.java
+++ b/src/java/org/apache/sqoop/lib/DelimiterSet.java
@@ -33,8 +33,19 @@ public class DelimiterSet implements Cloneable {
// If true, then the enclosed-by character is applied to every
// field, not just ones containing embedded delimiters.
+
private boolean encloseRequired;
+ public static final String OUTPUT_FIELD_DELIM_KEY =
+ "sqoop.output.field.delim";
+ public static final String OUTPUT_RECORD_DELIM_KEY =
+ "sqoop.output.record.delim";
+ public static final String OUTPUT_ENCLOSED_BY_KEY =
+ "sqoop.output.enclosed.by";
+ public static final String OUTPUT_ESCAPED_BY_KEY =
+ "sqoop.output.escaped.by";
+ public static final String OUTPUT_ENCLOSE_REQUIRED_KEY =
+ "sqoop.output.enclose.required";
/**
* Create a delimiter set with the default delimiters
* (comma for fields, newline for records).
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java b/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java
index 54eb258..72a955c 100644
--- a/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java
+++ b/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java
@@ -68,6 +68,12 @@ public class DefaultManagerFactory
return new SQLServerManager(options);
} else if (scheme.startsWith("jdbc:db2:")) {
return new Db2Manager(options);
+ } else if (scheme.startsWith("jdbc:netezza:")) {
+ if (options.isDirect()) {
+ return new DirectNetezzaManager(options);
+ } else {
+ return new NetezzaManager(options);
+ }
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
new file mode 100644
index 0000000..0a1e605
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableExportJob;
+import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableImportJob;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.cli.RelatedOptions;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Manages direct mode transfers from Netezza databases using the external table
+ * options.
+ */
+public class DirectNetezzaManager extends NetezzaManager {
+
+ public static final Log LOG = LogFactory.getLog(DirectNetezzaManager.class
+ .getName());
+
+ public static final String NETEZZA_LOG_DIR_OPT = "netezza.log.dir";
+ public static final String NETEZZA_LOG_DIR_LONG_ARG = "log-dir";
+
+ public static final String NETEZZA_ERROR_THRESHOLD_OPT =
+ "netezza.error.threshold";
+ public static final String NETEZZA_ERROR_THRESHOLD_LONG_ARG =
+ "max-errors";
+
+ private static final String QUERY_CHECK_DICTIONARY_FOR_TABLE =
+ "SELECT 1 FROM _V_OBJECTS WHERE OWNER= ? "
+ + " AND OBJNAME = ? and OBJTYPE = 'TABLE'";
+
+ public DirectNetezzaManager(SqoopOptions opts) {
+ super(opts);
+ try {
+ handleNetezzaExtraArgs(options);
+ } catch (ParseException ioe) {
+ throw new RuntimeException(ioe.getMessage(), ioe);
+ }
+ }
+
+ /**
+ * Check Table if it is valid for export. Parse the table like what we do in
+ * Oracle manager
+ *
+ * @throws IOException
+ * @throws ExportException
+ */
+ private void checkTable() throws IOException, ExportException {
+ String tableOwner = this.options.getUsername();
+ String tableName = this.options.getTableName();
+ String shortTableName = tableName;
+ int qualifierIndex = tableName.indexOf('.');
+ if (qualifierIndex != -1) {
+ tableOwner = tableName.substring(0, qualifierIndex);
+ shortTableName = tableName.substring(qualifierIndex + 1);
+ }
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+
+ try {
+ try {
+ conn = getConnection();
+ ps = conn.prepareStatement(QUERY_CHECK_DICTIONARY_FOR_TABLE,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ ps.setString(1, tableOwner);
+ ps.setString(2, shortTableName);
+ rs = ps.executeQuery();
+ if (!rs.next()) {
+ String message = tableName
+ + " is not a valid Netezza table. "
+ + "Please make sure that you have connected to the Netezza DB "
+ + "and the table name is right. The current values are\n\t"
+ + " connection string : " + options.getConnectString()
+ + "\n\t table owner : " + tableOwner + "\n\t table name : "
+ + shortTableName;
+ LOG.error(message);
+ throw new IOException(message);
+ }
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
+ if (ps != null) {
+ ps.close();
+ }
+ close();
+ }
+ } catch (SQLException sqle) {
+ throw new IOException("SQL exception checking table "
+ + sqle.getMessage(), sqle);
+ }
+ }
+
+ /**
+ * Export data stored in HDFS into a table in a database.
+ */
+ public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+ throws IOException, ExportException {
+ options = context.getOptions();
+ context.setConnManager(this);
+
+
+ checkTable(); // Throws excpetion as necessary
+ NetezzaExternalTableExportJob exporter = null;
+
+ char qc = (char) options.getInputEnclosedBy();
+ char ec = (char) options.getInputEscapedBy();
+
+ if (qc > 0 && !(qc == '"' || qc == '\'')) {
+ throw new ExportException("Input enclosed-by character must be '\"' "
+ + "or ''' for netezza direct mode exports");
+ }
+ if (ec > 0 && ec != '\\') {
+ throw new ExportException("Input escaped-by character must be '\\' "
+ + "for netezza direct mode exports");
+ }
+ exporter = new NetezzaExternalTableExportJob(context);
+ exporter.runExport();
+ }
+
+ /**
+ * Import the table into HDFS by using Netezza external tables to pull out the
+ * data from the database and upload the files directly to HDFS.
+ */
+ @Override
+ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
+ throws IOException, ImportException {
+
+ context.setConnManager(this);
+
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ SqoopOptions options = context.getOptions();
+
+ if (null == tableName) {
+ LOG.
+ error("Netezza external table import does not support query imports.");
+ LOG.
+ error("Do not use --direct and --query together for Netezza.");
+ throw
+ new IOException("Null tableName for Netezza external table import.");
+ }
+
+ char qc = options.getOutputEnclosedBy();
+ char ec = options.getOutputEscapedBy();
+
+ if (qc > 0 && !(qc == '"' || qc == '\'')) {
+ throw new ImportException("Output enclosed-by character must be '\"' "
+ + "or ''' for netezza direct mode imports");
+ }
+ if (ec > 0 && ec != '\\') {
+ throw new ImportException("Output escaped-by character must be '\\' "
+ + "for netezza direct mode exports");
+ }
+
+ NetezzaExternalTableImportJob importer = null;
+
+ importer = new NetezzaExternalTableImportJob(options, context);
+
+ // Direct Netezza Manager will use the datasliceid so no split columns
+ // will be used.
+
+ LOG.info("Beginning netezza fast path import");
+
+ if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
+ LOG.warn("File import layout " + options.getFileLayout()
+ + " is not supported by");
+ LOG.warn("Netezza direct import; import will proceed as text files.");
+ }
+
+ importer.runImport(tableName, jarFile, null, options.getConf());
+ }
+
+ protected RelatedOptions getNetezzaExtraOpts() {
+ // Just add the options from NetezzaManager and ignore the setting
+ // for direct mode access
+ RelatedOptions netezzaOpts =
+ new RelatedOptions("Netezza Connector Direct mode options");
+
+ netezzaOpts.addOption(OptionBuilder
+ .withArgName(NETEZZA_ERROR_THRESHOLD_OPT).hasArg()
+ .withDescription("Error threshold for the job")
+ .withLongOpt(NETEZZA_ERROR_THRESHOLD_LONG_ARG).create());
+ netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_LOG_DIR_OPT)
+ .hasArg().withDescription("Netezza log directory")
+ .withLongOpt(NETEZZA_LOG_DIR_LONG_ARG).create());
+ return netezzaOpts;
+ }
+
+ private void handleNetezzaExtraArgs(SqoopOptions opts)
+ throws ParseException {
+
+ Configuration conf = opts.getConf();
+
+ String[] extraArgs = opts.getExtraArgs();
+
+ RelatedOptions netezzaOpts = getNetezzaExtraOpts();
+ CommandLine cmdLine = new GnuParser().parse(netezzaOpts, extraArgs, true);
+ if (cmdLine.hasOption(NETEZZA_ERROR_THRESHOLD_LONG_ARG)) {
+ int threshold = Integer.parseInt(cmdLine
+ .getOptionValue(NETEZZA_ERROR_THRESHOLD_LONG_ARG));
+ conf.setInt(NETEZZA_ERROR_THRESHOLD_OPT, threshold);
+ }
+ if (cmdLine.hasOption(NETEZZA_LOG_DIR_LONG_ARG)) {
+ String dir = cmdLine.getOptionValue(NETEZZA_LOG_DIR_LONG_ARG);
+ conf.set(NETEZZA_LOG_DIR_OPT, dir);
+ }
+
+ // Always true for Netezza direct mode access
+ conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, true);
+ }
+
+ @Override
+ public boolean supportsStagingForExport() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/manager/MySQLUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/MySQLUtils.java b/src/java/org/apache/sqoop/manager/MySQLUtils.java
index ef18818..c86cf1a 100644
--- a/src/java/org/apache/sqoop/manager/MySQLUtils.java
+++ b/src/java/org/apache/sqoop/manager/MySQLUtils.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
import com.cloudera.sqoop.config.ConfigurationConstants;
import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.util.DirectImportUtils;
+import org.apache.sqoop.lib.DelimiterSet;
/**
* Helper methods and constants for MySQL imports/exports.
@@ -48,15 +49,16 @@ public final class MySQLUtils {
public static final String MYSQL_IMPORT_CMD = "mysqlimport";
public static final String OUTPUT_FIELD_DELIM_KEY =
- "sqoop.output.field.delim";
+ DelimiterSet.OUTPUT_FIELD_DELIM_KEY;
public static final String OUTPUT_RECORD_DELIM_KEY =
- "sqoop.output.record.delim";
+ DelimiterSet.OUTPUT_RECORD_DELIM_KEY;
public static final String OUTPUT_ENCLOSED_BY_KEY =
- "sqoop.output.enclosed.by";
+ DelimiterSet.OUTPUT_ENCLOSED_BY_KEY;
public static final String OUTPUT_ESCAPED_BY_KEY =
- "sqoop.output.escaped.by";
+ DelimiterSet.OUTPUT_ESCAPED_BY_KEY;
public static final String OUTPUT_ENCLOSE_REQUIRED_KEY =
- "sqoop.output.enclose.required";
+ DelimiterSet.OUTPUT_ENCLOSE_REQUIRED_KEY;
+
public static final String TABLE_NAME_KEY =
ConfigurationHelper.getDbInputTableNameProperty();
public static final String CONNECT_STRING_KEY =
@@ -67,6 +69,7 @@ public final class MySQLUtils {
ConfigurationHelper.getDbPasswordProperty();
public static final String WHERE_CLAUSE_KEY =
ConfigurationHelper.getDbInputConditionsProperty();
+
public static final String EXTRA_ARGS_KEY =
"sqoop.mysql.extra.args";
@@ -117,4 +120,3 @@ public final class MySQLUtils {
return tempFile.toString();
}
}
-
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/manager/NetezzaManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/NetezzaManager.java b/src/java/org/apache/sqoop/manager/NetezzaManager.java
new file mode 100644
index 0000000..0ac7717
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/NetezzaManager.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.mapreduce.AsyncSqlOutputFormat;
+import org.apache.sqoop.mapreduce.netezza.NetezzaDataDrivenDBInputFormat;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.cli.RelatedOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Manages connections to Netezza databases.
+ */
+public class NetezzaManager extends GenericJdbcManager {
+
+ public static final Log LOG = LogFactory.getLog(NetezzaManager.class
+ .getName());
+
+ // driver class to ensure is loaded when making db connection.
+ private static final String DRIVER_CLASS = "org.netezza.Driver";
+
+ // set to true after we warn the user that we can use direct fastpath.
+ protected static boolean directModeWarningPrinted = false;
+
+ // set to true after we warn the user that they should consider using
+ // batching.
+ protected static boolean batchModeWarningPrinted = false;
+
+ public static final String NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT =
+ "netezza.dataslice.aligned.access";
+
+ public static final String NETEZZA_DATASLICE_ALIGNED_ACCESS_LONG_ARG =
+ "partitioned-access";
+
+ public NetezzaManager(final SqoopOptions opts) {
+ super(DRIVER_CLASS, opts);
+ }
+
+
+ @Override
+ public String escapeColName(String colName) {
+ return escapeIdentifier(colName);
+ }
+
+ @Override
+ public String escapeTableName(String tableName) {
+ return escapeIdentifier(tableName);
+ }
+
+ protected String escapeIdentifier(String identifier) {
+ if (identifier == null) {
+ return null;
+ }
+ return "\"" + identifier.replace("\"", "\"\"") + "\"";
+ }
+
+
+ @Override
+ public void close() throws SQLException {
+ if (this.hasOpenConnection()) {
+ this.getConnection().rollback(); // Rollback any changes
+ }
+
+ super.close();
+ }
+
+ @Override
+ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
+ throws IOException, ImportException {
+ context.setConnManager(this);
+ // The user probably should have requested --direct to invoke external
+ // table option.
+ // Display a warning informing them of this fact.
+ if (!NetezzaManager.directModeWarningPrinted) {
+ LOG.warn("It looks like you are importing from Netezza.");
+ LOG.warn("This transfer can be faster! Use the --direct");
+ LOG.warn("option to exercise a Netezza-specific fast path.");
+
+ NetezzaManager.directModeWarningPrinted = true; // don't display this
+ // twice.
+ }
+ try {
+ handleNetezzaImportExtraArgs(context);
+ } catch (ParseException pe) {
+ throw (ImportException) new ImportException(pe.getMessage(), pe);
+ }
+ // Then run the normal importTable() method.
+ super.importTable(context);
+ }
+
+ @Override
+ public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+ throws IOException, ExportException {
+ // The user probably should have requested --direct to invoke external
+ // table option.
+ // Display a warning informing them of this fact.
+ context.setConnManager(this);
+ if (!NetezzaManager.directModeWarningPrinted) {
+ LOG.warn("It looks like you are exporting to Netezza.");
+ LOG.warn("This transfer can be faster! Use the --direct");
+ LOG.warn("option to exercise a Netezza-specific fast path.");
+
+ NetezzaManager.directModeWarningPrinted = true; // don't display this
+ // twice.
+ }
+
+ // Netezza does not have multi row inserts
+ if (!options.isBatchMode()) {
+ if (!NetezzaManager.batchModeWarningPrinted) {
+ LOG.warn("It looks like you are exporting to Netezza in non-batch ");
+ LOG.warn("mode. Still this transfer can be made faster! Use the ");
+ LOG.warn("--batch option to exercise a Netezza-specific fast path.");
+ LOG.warn("Forcing records per statement to 1 in non batch mode");
+
+ NetezzaManager.batchModeWarningPrinted = true; // don't display this
+ // twice.
+ }
+ context.getOptions().getConf()
+ .setInt(AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY, 1);
+ }
+ // options.setBatchMode(true);
+ // TODO Force batchmode?
+ super.exportTable(context);
+ }
+
+ @Override
+ public void updateTable(com.cloudera.sqoop.manager.ExportJobContext context)
+ throws IOException, ExportException {
+ if (options.getNumMappers() > 1) {
+ String msg = "Netezza update with multiple mappers can lead to "
+ + "inconsistencies - Please set num-mappers option to 1 in the SQOOP "
+ + "command line for update jobs with Netezza and SQOOP";
+ throw new ExportException(msg);
+ }
+
+ if (!options.isBatchMode()) {
+ if (!NetezzaManager.batchModeWarningPrinted) {
+ LOG.warn("It looks like you are exporting to Netezza in non-batch ");
+ LOG.warn("mode. Still this transfer can be made faster! Use the ");
+ LOG.warn("--batch option to exercise a Netezza-specific fast path.");
+ LOG.warn("Forcing records per statement to 1 in non batch mode");
+ NetezzaManager.batchModeWarningPrinted = true; // don't display this
+ // twice.
+ }
+ context.getOptions().getConf()
+ .setInt(AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY, 1);
+ }
+ super.updateTable(context);
+ }
+
+ @Override
+ public boolean supportsStagingForExport() {
+ return true;
+ }
+
+ @Override
+ protected String getCurTimestampQuery() {
+ return "SELECT CURRENT_TIMESTAMP";
+ }
+
+ protected RelatedOptions getNetezzaExtraOpts() {
+ RelatedOptions netezzaOpts = new RelatedOptions("Netezza options");
+ netezzaOpts.addOption(OptionBuilder
+ .withArgName(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT).hasArg()
+ .withDescription("Data slice aligned import")
+ .withLongOpt(NETEZZA_DATASLICE_ALIGNED_ACCESS_LONG_ARG).create());
+ return netezzaOpts;
+ }
+
+ private void handleNetezzaImportExtraArgs(ImportJobContext context)
+ throws ParseException {
+
+ SqoopOptions opts = context.getOptions();
+ Configuration conf = opts.getConf();
+
+ String[] extraArgs = opts.getExtraArgs();
+
+
+ conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, false);
+
+ if (extraArgs != null && extraArgs.length > 0
+ && ConfigurationHelper.getConfNumMaps(conf) > 1) {
+ RelatedOptions netezzaOpts = getNetezzaExtraOpts();
+ CommandLine cmdLine = new GnuParser().parse(netezzaOpts, extraArgs, true);
+ if (cmdLine.hasOption(NETEZZA_DATASLICE_ALIGNED_ACCESS_LONG_ARG)) {
+ conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, true);
+ context.setInputFormat(NetezzaDataDrivenDBInputFormat.class);
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaDBDataSliceSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaDBDataSliceSplitter.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaDBDataSliceSplitter.java
new file mode 100644
index 0000000..368a349
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaDBDataSliceSplitter.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.sqoop.mapreduce.db.DBSplitter;
+import
+ org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit;
+
+import com.cloudera.sqoop.config.ConfigurationHelper;
+
+/**
+ * Netezza specific splitter based on data slice id.
+ */
+public class NetezzaDBDataSliceSplitter implements DBSplitter {
+
+ // Note: We have removed the throws SQLException clause as there is no
+ // SQL work done in this method
+ @Override
+ public List<InputSplit> split(Configuration conf, ResultSet results,
+ String colName) {
+ // For each map we will add a split such that
+ // the datasliceid % the mapper index equals the mapper index.
+ // The query will only be on the lower bound where clause.
+ // For upper bounds, we will specify a constant clause which always
+ // evaluates to true
+
+ int numSplits = ConfigurationHelper.getConfNumMaps(conf);
+ List<InputSplit> splitList = new ArrayList<InputSplit>(numSplits);
+ for (int i = 0; i < numSplits; ++i) {
+ StringBuilder lowerBoundClause = new StringBuilder(128);
+ lowerBoundClause.append(" datasliceid % ").append(numSplits)
+ .append(" = ").append(i);
+ splitList.add(new DataDrivenDBInputSplit(lowerBoundClause.toString(),
+ "1 = 1"));
+ }
+ return splitList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
new file mode 100644
index 0000000..410a569
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.sqoop.io.NamedFifo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.manager.DirectNetezzaManager;
+import org.apache.sqoop.mapreduce.SqoopMapper;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.PerfCounters;
+import org.apache.sqoop.util.TaskId;
+
+import com.cloudera.sqoop.lib.DelimiterSet;
+
+/**
+ * Netezza export mapper using external tables.
+ */
+public abstract class NetezzaExternalTableExportMapper<K, V> extends
+ SqoopMapper<K, V, NullWritable, NullWritable> {
+ /**
+ * Create a named FIFO, and start the Netezza JDBC thread connected to that
+ * FIFO. A File object representing the FIFO is in 'fifoFile'.
+ */
+
+ private Configuration conf;
+ private DBConfiguration dbc;
+ private File fifoFile;
+ private Connection con;
+ private OutputStream recordWriter;
+ public static final Log LOG = LogFactory
+ .getLog(NetezzaExternalTableImportMapper.class.getName());
+ private NetezzaJDBCStatementRunner extTableThread;
+ private PerfCounters counter;
+ private DelimiterSet outputDelimiters;
+
+ private String getSqlStatement() throws IOException {
+
+ char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ',');
+ char qc = (char) conf.getInt(DelimiterSet.OUTPUT_ENCLOSED_BY_KEY, 0);
+ char ec = (char) conf.getInt(DelimiterSet.OUTPUT_ESCAPED_BY_KEY, 0);
+
+ int errorThreshold = conf.getInt(
+ DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
+ String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
+
+ StringBuilder sqlStmt = new StringBuilder(2048);
+
+ sqlStmt.append("INSERT INTO ");
+ sqlStmt.append(dbc.getInputTableName());
+ sqlStmt.append(" SELECT * FROM EXTERNAL '");
+ sqlStmt.append(fifoFile.getAbsolutePath());
+ sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
+ sqlStmt.append(" BOOLSTYLE 'TRUE_FALSE' ");
+ sqlStmt.append(" CRINSTRING FALSE ");
+ sqlStmt.append(" DELIMITER ");
+ sqlStmt.append(Integer.toString(fd));
+ sqlStmt.append(" ENCODING 'internal' ");
+ if (ec > 0) {
+ sqlStmt.append(" ESCAPECHAR '\\' ");
+ }
+ sqlStmt.append(" FORMAT 'Text' ");
+ sqlStmt.append(" INCLUDEZEROSECONDS TRUE ");
+ sqlStmt.append(" NULLVALUE 'NULL' ");
+ if (qc > 0) {
+ switch (qc) {
+ case '\'':
+ sqlStmt.append(" QUOTEDVALUE SINGLE ");
+ break;
+ case '\"':
+ sqlStmt.append(" QUOTEDVALUE DOUBLE ");
+ break;
+ default:
+ LOG.warn("Unsupported enclosed by character: " + qc + " - ignoring.");
+ }
+ }
+ sqlStmt.append(" MAXERRORS ").append(errorThreshold);
+
+ if (logDir != null) {
+ logDir = logDir.trim();
+ if (logDir.length() > 0) {
+ File logDirPath = new File(logDir);
+ logDirPath.mkdirs();
+ if (logDirPath.canWrite() && logDirPath.isDirectory()) {
+ sqlStmt.append(" LOGDIR ").append(logDir).append(' ');
+ } else {
+ throw new IOException("Unable to create log directory specified");
+ }
+ }
+ }
+ sqlStmt.append(")");
+
+ String stmt = sqlStmt.toString();
+ LOG.debug("SQL generated for external table export" + stmt);
+
+ return stmt;
+ }
+
+ private void initNetezzaExternalTableExport(Context context)
+ throws IOException {
+ this.conf = context.getConfiguration();
+ dbc = new DBConfiguration(conf);
+ File taskAttemptDir = TaskId.getLocalWorkPath(conf);
+ this.outputDelimiters = new DelimiterSet(',', '\n', '\000', '\\', false);
+ this.fifoFile = new File(taskAttemptDir, ("nzexttable-export.txt"));
+ String filename = fifoFile.toString();
+ NamedFifo nf;
+ // Create the FIFO itself.
+ try {
+ nf = new NamedFifo(this.fifoFile);
+ nf.create();
+ } catch (IOException ioe) {
+ // Command failed.
+ LOG.error("Could not create FIFO file " + filename);
+ this.fifoFile = null;
+ throw new IOException(
+ "Could not create FIFO for netezza external table import", ioe);
+ }
+ String sqlStmt = getSqlStatement();
+ boolean cleanup = false;
+ try {
+ con = dbc.getConnection();
+ extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(),
+ con, sqlStmt);
+ } catch (SQLException sqle) {
+ cleanup = true;
+ throw new IOException(sqle);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ } finally {
+ if (con != null && cleanup) {
+ try {
+ con.close();
+ } catch (Exception e) {
+ LOG.debug("Exception closing connection " + e.getMessage());
+ }
+ }
+ con = null;
+ }
+
+ counter = new PerfCounters();
+ extTableThread.start();
+ // We start the JDBC thread first in this case as we want the FIFO reader to
+ // be running.
+ recordWriter = new BufferedOutputStream(new FileOutputStream(nf.getFile()));
+ counter.startClock();
+ }
+
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+ setup(context);
+ initNetezzaExternalTableExport(context);
+ if (extTableThread.isAlive()) {
+ try {
+ while (context.nextKeyValue()) {
+ if (Thread.interrupted()) {
+ if (!extTableThread.isAlive()) {
+ break;
+ }
+ }
+ map(context.getCurrentKey(), context.getCurrentValue(), context);
+ }
+ cleanup(context);
+ } finally {
+ recordWriter.close();
+ extTableThread.join();
+ counter.stopClock();
+ LOG.info("Transferred " + counter.toString());
+ if (extTableThread.hasExceptions()) {
+ extTableThread.printException();
+ throw new IOException(extTableThread.getExcepton());
+ }
+ }
+ }
+ }
+
+ protected void writeTextRecord(Text record) throws IOException,
+ InterruptedException {
+ String outputStr = record.toString() + "\n";
+ byte[] outputBytes = outputStr.getBytes("UTF-8");
+ counter.addBytes(outputBytes.length);
+ recordWriter.write(outputBytes, 0, outputBytes.length);
+ }
+
+ protected void writeSqoopRecord(SqoopRecord sqr) throws IOException,
+ InterruptedException {
+ String outputStr = sqr.toString(this.outputDelimiters);
+ byte[] outputBytes = outputStr.getBytes("UTF-8");
+ counter.addBytes(outputBytes.length);
+ recordWriter.write(outputBytes, 0, outputBytes.length);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
new file mode 100644
index 0000000..9e6cab6
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.sqoop.config.ConfigurationHelper;
+import org.apache.sqoop.io.NamedFifo;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.manager.DirectNetezzaManager;
+import org.apache.sqoop.mapreduce.SqoopMapper;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.PerfCounters;
+import org.apache.sqoop.util.TaskId;
+
+/**
+ * Netezza import mapper using external tables.
+ */
+public class NetezzaExternalTableImportMapper extends
+ SqoopMapper<Integer, NullWritable, Text, NullWritable> {
+ /**
+ * Create a named FIFO, and start Netezza import connected to that FIFO. A
+ * File object representing the FIFO is in 'fifoFile'.
+ */
+
+ private Configuration conf;
+ private DBConfiguration dbc;
+ private File fifoFile;
+ private int numMappers;
+ private Connection con;
+ private BufferedReader recordReader;
+ public static final Log LOG = LogFactory
+ .getLog(NetezzaExternalTableImportMapper.class.getName());
+ private NetezzaJDBCStatementRunner extTableThread;
+ private PerfCounters counter;
+
+ private String getSqlStatement(int myId) throws IOException {
+
+ char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ',');
+ char qc = (char) conf.getInt(DelimiterSet.OUTPUT_ENCLOSED_BY_KEY, 0);
+ char ec = (char) conf.getInt(DelimiterSet.OUTPUT_ESCAPED_BY_KEY, 0);
+ int errorThreshold = conf.getInt(
+ DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
+ String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
+ String[] cols = dbc.getOutputFieldNames();
+ String inputConds = dbc.getInputConditions();
+ StringBuilder sqlStmt = new StringBuilder(2048);
+
+ sqlStmt.append("CREATE EXTERNAL TABLE '");
+ sqlStmt.append(fifoFile.getAbsolutePath());
+ sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
+ sqlStmt.append(" BOOLSTYLE 'T_F' ");
+ sqlStmt.append(" CRINSTRING FALSE ");
+ sqlStmt.append(" DELIMITER ");
+ sqlStmt.append(Integer.toString(fd));
+ sqlStmt.append(" ENCODING 'internal' ");
+ if (ec > 0) {
+ sqlStmt.append(" ESCAPECHAR '\\' ");
+ }
+ sqlStmt.append(" FORMAT 'Text' ");
+ sqlStmt.append(" INCLUDEZEROSECONDS TRUE ");
+ sqlStmt.append(" NULLVALUE 'null' ");
+ if (qc > 0) {
+ switch (qc) {
+ case '\'':
+ sqlStmt.append(" QUOTEDVALUE SINGLE ");
+ break;
+ case '\"':
+ sqlStmt.append(" QUOTEDVALUE DOUBLE ");
+ break;
+ default:
+ LOG.warn("Unsupported enclosed by character: " + qc + " - ignoring.");
+ }
+ }
+
+ sqlStmt.append(" MAXERRORS ").append(errorThreshold);
+
+ if (logDir != null) {
+ logDir = logDir.trim();
+ if (logDir.length() > 0) {
+ File logDirPath = new File(logDir);
+ logDirPath.mkdirs();
+ if (logDirPath.canWrite() && logDirPath.isDirectory()) {
+ sqlStmt.append(" LOGDIR ").append(logDir).append(' ');
+ } else {
+ throw new IOException("Unable to create log directory specified");
+ }
+ }
+ }
+
+ sqlStmt.append(") AS SELECT ");
+ if (cols == null || cols.length == 0) {
+ sqlStmt.append('*');
+ } else {
+ sqlStmt.append(cols[0]).append(' ');
+ for (int i = 0; i < cols.length; ++i) {
+ sqlStmt.append(',').append(cols[i]);
+ }
+ }
+ sqlStmt.append(" FROM ").append(dbc.getInputTableName()).append(' ');
+ sqlStmt.append("WHERE (DATASLICEID % ");
+ sqlStmt.append(numMappers).append(") = ").append(myId);
+ if (inputConds != null && inputConds.length() > 0) {
+ sqlStmt.append(" AND ( ").append(inputConds).append(')');
+ }
+
+ String stmt = sqlStmt.toString();
+ LOG.debug("SQL generated for external table import for data slice " + myId
+ + "=" + stmt);
+ return stmt;
+ }
+
+ private void initNetezzaExternalTableImport(int myId) throws IOException {
+
+ File taskAttemptDir = TaskId.getLocalWorkPath(conf);
+
+ this.fifoFile = new File(taskAttemptDir, ("nzexttable-" + myId + ".txt"));
+ String filename = fifoFile.toString();
+ NamedFifo nf;
+ // Create the FIFO itself.
+ try {
+ nf = new NamedFifo(this.fifoFile);
+ nf.create();
+ } catch (IOException ioe) {
+ // Command failed.
+ LOG.error("Could not create FIFO file " + filename);
+ this.fifoFile = null;
+ throw new IOException(
+ "Could not create FIFO for netezza external table import", ioe);
+ }
+ String sqlStmt = getSqlStatement(myId);
+ boolean cleanup = false;
+ try {
+ con = dbc.getConnection();
+ extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(),
+ con, sqlStmt);
+ } catch (SQLException sqle) {
+ cleanup = true;
+ throw new IOException(sqle);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ } finally {
+ if (con != null && cleanup) {
+ try {
+ con.close();
+ } catch (Exception e) {
+ LOG.debug("Exception closing connection " + e.getMessage());
+ }
+ }
+ con = null;
+ }
+ extTableThread.start();
+ // We need to start the reader end first
+ recordReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(nf.getFile())));
+ }
+
+ public void map(Integer dataSliceId, NullWritable val, Context context)
+ throws IOException, InterruptedException {
+ conf = context.getConfiguration();
+ dbc = new DBConfiguration(conf);
+ numMappers = ConfigurationHelper.getConfNumMaps(conf);
+ char rd = (char) conf.getInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY, '\n');
+ initNetezzaExternalTableImport(dataSliceId);
+ counter = new PerfCounters();
+ counter.startClock();
+ Text outputRecord = new Text();
+ if (extTableThread.isAlive()) {
+ try {
+ String inputRecord = recordReader.readLine();
+ while (inputRecord != null) {
+ if (Thread.interrupted()) {
+ if (!extTableThread.isAlive()) {
+ break;
+ }
+ }
+ outputRecord.set(inputRecord + rd);
+ // May be we should set the output to be String for faster performance
+ // There is no real benefit in changing it to Text and then
+ // converting it back in our case
+ context.write(outputRecord, NullWritable.get());
+ counter.addBytes(1 + inputRecord.length());
+ inputRecord = recordReader.readLine();
+ }
+ } finally {
+ recordReader.close();
+ extTableThread.join();
+ counter.stopClock();
+ LOG.info("Transferred " + counter.toString());
+ if (extTableThread.hasExceptions()) {
+ extTableThread.printException();
+ throw new IOException(extTableThread.getExcepton());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableRecordExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableRecordExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableRecordExportMapper.java
new file mode 100644
index 0000000..d3024e5
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableRecordExportMapper.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.sqoop.lib.SqoopRecord;
+
+/**
+ * Netezza export mapper for Sqoop records.
+ */
+public class NetezzaExternalTableRecordExportMapper extends
+ NetezzaExternalTableExportMapper<LongWritable, SqoopRecord> {
+
+ @Override
+ public void map(LongWritable key, SqoopRecord sqr, Context context)
+ throws IOException, InterruptedException {
+ writeSqoopRecord(sqr);
+ context.progress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableTextExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableTextExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableTextExportMapper.java
new file mode 100644
index 0000000..c703b97
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableTextExportMapper.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Netezza export mapper for text records.
+ */
+public class NetezzaExternalTableTextExportMapper extends
+ NetezzaExternalTableExportMapper<LongWritable, Text> {
+
+ @Override
+ public void map(LongWritable key, Text text, Context context)
+ throws IOException, InterruptedException {
+ writeTextRecord(text);
+ context.progress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
new file mode 100644
index 0000000..3a5df40
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A simple class for JDBC External table statement execution for Netezza. Even
+ * though the statements are execute only (no support for bind variables or
+ * resultsets, we use a two step process so that sql statement errors are caught
+ * during construction itself.
+ */
+public class NetezzaJDBCStatementRunner extends Thread {
+ public static final Log LOG = LogFactory
+ .getLog(NetezzaJDBCStatementRunner.class.getName());
+
+ private Connection con;
+ private Exception exception;
+ private PreparedStatement ps;
+ private Thread parent;
+
+ public boolean hasExceptions() {
+ return exception != null;
+ }
+
+ public void printException() {
+ if (exception != null) {
+ LOG.error("Errors encountered during external table JDBC processing");
+ LOG.error("Exception " + exception.getMessage(), exception);
+ }
+ }
+
+ public Throwable getExcepton() {
+ if (!hasExceptions()) {
+ return null;
+ }
+ return exception;
+ }
+
+ public NetezzaJDBCStatementRunner(Thread parent, Connection con,
+ String sqlStatement) throws SQLException {
+ this.parent = parent;
+ this.con = con;
+ this.ps = con.prepareStatement(sqlStatement);
+ this.exception = null;
+ }
+
+ public void run() {
+ boolean interruptParent = false;
+ try {
+
+ // Excecute the statement - this will make data to flow in the
+ // named pipes
+ ps.execute();
+
+ } catch (SQLException sqle) {
+ interruptParent = true;
+ LOG.error("Unable to execute external table export", sqle);
+ this.exception = sqle;
+ } finally {
+ if (con != null) {
+ try {
+ con.close();
+ } catch (Exception e) {
+ LOG.debug("Exception closing connection " + e.getMessage());
+ }
+ }
+ con = null;
+ }
+ if (interruptParent) {
+ this.parent.interrupt();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaDataDrivenDBInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaDataDrivenDBInputFormat.java b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaDataDrivenDBInputFormat.java
new file mode 100644
index 0000000..c4e0062
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaDataDrivenDBInputFormat.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.netezza;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.sqoop.manager.NetezzaManager;
+import org.apache.sqoop.mapreduce.DBWritable;
+import org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import org.apache.sqoop.mapreduce.db.netezza.NetezzaDBDataSliceSplitter;
+
+import com.cloudera.sqoop.config.ConfigurationHelper;
+
+/**
+ * Netezza specific DB input format.
+ */
+public class NetezzaDataDrivenDBInputFormat<T extends DBWritable> extends
+ DataDrivenDBInputFormat<T> implements Configurable {
+ private static final Log LOG = LogFactory
+ .getLog(NetezzaDataDrivenDBInputFormat.class);
+
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ int numMappers = ConfigurationHelper.getJobNumMaps(job);
+
+ String boundaryQuery = getDBConf().getInputBoundingQuery();
+ // Resort to base class if
+ // dataslice aligned import is not requested
+ // Not table extract
+ // No boundary query
+ // Only one mapper.
+ if (!getConf().getBoolean(
+ NetezzaManager.NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, false)
+ || getDBConf().getInputTableName() == null
+ || numMappers == 1
+ || (boundaryQuery != null && !boundaryQuery.isEmpty())) {
+ return super.getSplits(job);
+ }
+
+ // Generate a splitter that splits only on datasliceid. It is an
+ // integer split. We will just use the lower bounding query to specify
+ // the restriction of dataslice and set the upper bound to a constant
+
+ NetezzaDBDataSliceSplitter splitter = new NetezzaDBDataSliceSplitter();
+
+ return splitter.split(getConf(), null, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java
new file mode 100644
index 0000000..2a702d9
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.netezza;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.mapreduce.DBWritable;
+import
+ org.apache.sqoop.mapreduce.db.netezza.NetezzaExternalTableRecordExportMapper;
+import
+ org.apache.sqoop.mapreduce.db.netezza.NetezzaExternalTableTextExportMapper;
+
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.ExportJobBase;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * Class that runs an export job using netezza external tables in the mapper.
+ */
+public class NetezzaExternalTableExportJob extends ExportJobBase {
+
+ public static final Log LOG = LogFactory
+ .getLog(NetezzaExternalTableExportJob.class.getName());
+
+ public NetezzaExternalTableExportJob(final ExportJobContext context) {
+ super(context, null, null, NullOutputFormat.class);
+ }
+
+ @Override
+ /**
+ * Configure the inputformat to use for the job.
+ */
+ protected void configureInputFormat(Job job, String tableName,
+ String tableClassName, String splitByCol) throws ClassNotFoundException,
+ IOException {
+
+ // Configure the delimiters, etc.
+ Configuration conf = job.getConfiguration();
+ conf.setInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY,
+ options.getInputFieldDelim());
+ conf.setInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY,
+ options.getInputRecordDelim());
+ conf.setInt(DelimiterSet.OUTPUT_ENCLOSED_BY_KEY,
+ options.getInputEnclosedBy());
+ // Netezza uses \ as the escape character. Force the use of it
+ int escapeChar = options.getOutputEscapedBy();
+ if (escapeChar > 0 && escapeChar != '\\') {
+ LOG.info("Setting escaped char to \\ for Netezza external table import");
+ conf.setInt(DelimiterSet.OUTPUT_ESCAPED_BY_KEY, '\\');
+ }
+ conf.setBoolean(DelimiterSet.OUTPUT_ENCLOSE_REQUIRED_KEY,
+ options.isOutputEncloseRequired());
+
+ ConnManager mgr = context.getConnManager();
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
+ options.getConnectString());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
+ options.getConnectString(), username, options.getPassword());
+ }
+
+ String[] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = mgr.getColumnNames(tableName);
+ }
+
+ String[] sqlColNames = null;
+ if (null != colNames) {
+ sqlColNames = new String[colNames.length];
+ for (int i = 0; i < colNames.length; i++) {
+ sqlColNames[i] = mgr.escapeColName(colNames[i]);
+ }
+ }
+
+ DataDrivenDBInputFormat.setInput(job, DBWritable.class, tableName, null,
+ null, sqlColNames);
+
+ // Configure the actual InputFormat to use.
+ super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+ }
+
+ @Override
+ protected Class<? extends Mapper> getMapperClass() {
+ if (inputIsSequenceFiles()) {
+ return NetezzaExternalTableRecordExportMapper.class;
+ } else {
+ return NetezzaExternalTableTextExportMapper.class;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableImportJob.java b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableImportJob.java
new file mode 100644
index 0000000..7ee6f70
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableImportJob.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.netezza;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.mapreduce.DBWritable;
+import org.apache.sqoop.mapreduce.ImportJobBase;
+import org.apache.sqoop.mapreduce.RawKeyTextOutputFormat;
+import org.apache.sqoop.mapreduce.db.netezza.NetezzaExternalTableImportMapper;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * Class that runs an import job using netezza external tables in the mapper.
+ */
+public class NetezzaExternalTableImportJob extends ImportJobBase {
+
+ public NetezzaExternalTableImportJob(final SqoopOptions opts,
+ ImportJobContext context) {
+ super(opts, NetezzaExternalTableImportMapper.class,
+ NetezzaExternalTableInputFormat.class, RawKeyTextOutputFormat.class,
+ context);
+ }
+
+ /**
+ * Configure the inputformat to use for the job.
+ */
+
+ protected void configureInputFormat(Job job, String tableName,
+ String tableClassName, String splitByCol) throws ClassNotFoundException,
+ IOException {
+
+ ConnManager mgr = getContext().getConnManager();
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
+ options.getConnectString());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
+ options.getConnectString(), username, options.getPassword());
+ }
+
+ String[] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = mgr.getColumnNames(tableName);
+ }
+
+ String[] sqlColNames = null;
+ if (null != colNames) {
+ sqlColNames = new String[colNames.length];
+ for (int i = 0; i < colNames.length; i++) {
+ sqlColNames[i] = mgr.escapeColName(colNames[i]);
+ }
+ }
+
+ // It's ok if the where clause is null in DBInputFormat.setInput.
+ String whereClause = options.getWhereClause();
+
+ // We can't set the class properly in here, because we may not have the
+ // jar loaded in this JVM. So we start by calling setInput() with
+ // DBWritable and then overriding the string manually.
+
+ // Note that mysqldump also does *not* want a quoted table name.
+ DataDrivenDBInputFormat.setInput(job, DBWritable.class, tableName,
+ whereClause, mgr.escapeColName(splitByCol), sqlColNames);
+
+ Configuration conf = job.getConfiguration();
+ conf.setInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY,
+ options.getOutputFieldDelim());
+ conf.setInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY,
+ options.getOutputRecordDelim());
+ conf.setInt(DelimiterSet.OUTPUT_ENCLOSED_BY_KEY,
+ options.getOutputEnclosedBy());
+ // Netezza uses \ as the escape character. Force the use of it
+ int escapeChar = options.getOutputEscapedBy();
+ if (escapeChar > 0 && escapeChar != '\\') {
+ LOG.info("Setting escaped char to \\ for Netezza external table import");
+ conf.setInt(DelimiterSet.OUTPUT_ESCAPED_BY_KEY, '\\');
+ }
+ conf.setBoolean(DelimiterSet.OUTPUT_ENCLOSE_REQUIRED_KEY,
+ options.isOutputEncloseRequired());
+
+ LOG.debug("Using InputFormat: " + inputFormatClass);
+ job.setInputFormatClass(getInputFormatClass());
+ }
+
+ /**
+ * Set the mapper class implementation to use in the job, as well as any
+ * related configuration (e.g., map output types).
+ */
+ protected void configureMapper(Job job, String tableName,
+ String tableClassName) throws ClassNotFoundException, IOException {
+ job.setMapperClass(getMapperClass());
+ job.setOutputKeyClass(String.class);
+ job.setOutputValueClass(NullWritable.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputFormat.java b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputFormat.java
new file mode 100644
index 0000000..631c664
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputFormat.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.netezza;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.cloudera.sqoop.config.ConfigurationHelper;
+
+/**
+ * InputFormat designed to take data-driven splits and use them in the netezza
+ * external table import invocation running in the mapper.
+ *
+ * The key emitted by this mapper is the data slice id to for the Netezza
+ * external table query.
+ */
+public class NetezzaExternalTableInputFormat extends
+ InputFormat<Integer, NullWritable> {
+
+ public static final Log LOG = LogFactory
+ .getLog(NetezzaExternalTableInputFormat.class.getName());
+
+ /**
+ * A RecordReader that just takes the WHERE conditions from the DBInputSplit
+ * and relates them to the mapper as a single input record.
+ */
+ public static class NetezzaExternalTableRecordReader extends
+ RecordReader<Integer, NullWritable> {
+
+ private boolean delivered;
+ private InputSplit split;
+
+ public NetezzaExternalTableRecordReader(InputSplit split) {
+ initialize(split, null);
+ }
+
+ @Override
+ public boolean nextKeyValue() {
+ boolean hasNext = !delivered;
+ delivered = true;
+ return hasNext;
+ }
+
+ @Override
+ public Integer getCurrentKey() {
+ return ((NetezzaExternalTableInputSplit) this.split).getDataSliceId();
+ }
+
+ @Override
+ public NullWritable getCurrentValue() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public float getProgress() {
+ return delivered ? 1.0f : 0.0f;
+ }
+
+ @Override
+ public void initialize(InputSplit s, TaskAttemptContext context) {
+ this.split = s;
+ this.delivered = false;
+ }
+ }
+
+ public RecordReader<Integer, NullWritable> createRecordReader(
+ InputSplit split, TaskAttemptContext context) {
+ return new NetezzaExternalTableRecordReader(split);
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException,
+ InterruptedException {
+ int targetNumTasks = ConfigurationHelper.getJobNumMaps(context);
+ List<InputSplit> splits = new ArrayList<InputSplit>(targetNumTasks);
+ for (int i = 0; i < targetNumTasks; ++i) {
+ splits.add(new NetezzaExternalTableInputSplit(i));
+ }
+ return splits;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputSplit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputSplit.java b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputSplit.java
new file mode 100644
index 0000000..95cdcba
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputSplit.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.netezza;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Netezza dataslice specific input splitter.
+ *
+ */
+public class NetezzaExternalTableInputSplit extends InputSplit implements
+ Writable {
+
+ public static final Log LOG = LogFactory
+ .getLog(NetezzaExternalTableInputSplit.class.getName());
+
+ private int dataSliceId; // The datasliceid associated with this split
+
+ public NetezzaExternalTableInputSplit() {
+ this.dataSliceId = 0;
+ }
+
+ public NetezzaExternalTableInputSplit(int dataSliceId) {
+ this.dataSliceId = dataSliceId;
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 0L;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[0];
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ dataSliceId = input.readInt();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(dataSliceId);
+ }
+
+ public Integer getDataSliceId() {
+ return dataSliceId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java b/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java
new file mode 100644
index 0000000..bbcd138
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.manager.DirectNetezzaManager;
+import org.junit.After;
+import org.junit.Before;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.TestExport;
+
+/**
+ * Test the DirectNetezzaManager implementation's exportJob() functionality.
+ */
+public class DirectNetezzaExportManualTest extends TestExport {
+
+ public static final Log LOG = LogFactory.getLog(
+ DirectNetezzaExportManualTest.class.getName());
+
+ static final String TABLE_PREFIX = "EMPNZ";
+
+ // instance variables populated during setUp, used during tests.
+ private DirectNetezzaManager manager;
+ private Connection conn;
+
+ @Override
+ protected Connection getConnection() {
+ return conn;
+ }
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ @Override
+ protected String getConnectString() {
+ return NetezzaTestUtils.getNZConnectString();
+ }
+
+ @Override
+ protected String getTablePrefix() {
+ return TABLE_PREFIX;
+ }
+
+ @Override
+ protected String getDropTableStatement(String tableName) {
+ return "DROP TABLE " + tableName;
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ conn = getConnection();
+ SqoopOptions options = new SqoopOptions(
+ NetezzaTestUtils.getNZConnectString(), getTableName());
+ options.setUsername(NetezzaTestUtils.getNZUser());
+ options.setPassword(NetezzaTestUtils.getNZPassword());
+ this.manager = new DirectNetezzaManager(options);
+
+ try {
+ this.conn = manager.getConnection();
+ this.conn.setAutoCommit(false);
+ } catch (SQLException sqlE) {
+ LOG.error("Encountered SQL Exception: " + sqlE);
+ sqlE.printStackTrace();
+ fail("SQLException when running test setUp(): " + sqlE);
+ }
+ }
+
+ @After
+ public void tearDown() {
+ super.tearDown();
+ if (null != manager) {
+ try {
+ manager.close();
+ } catch (SQLException sqlE) {
+ LOG.error("Got SQLException: " + sqlE.toString());
+ fail("Got SQLException: " + sqlE.toString());
+ }
+ }
+ this.conn = null;
+ this.manager = null;
+
+ }
+
+ @Override
+ protected String [] getCodeGenArgv(String... extraArgs) {
+
+ String [] moreArgs = new String[extraArgs.length + 4];
+ int i = 0;
+ for (i = 0; i < extraArgs.length; i++) {
+ moreArgs[i] = extraArgs[i];
+ }
+
+ // Add username argument for netezza.
+ moreArgs[i++] = "--username";
+ moreArgs[i++] = NetezzaTestUtils.getNZUser();
+ moreArgs[i++] = "--password";
+ moreArgs[i++] = NetezzaTestUtils.getNZPassword();
+
+ return super.getCodeGenArgv(moreArgs);
+ }
+
+ @Override
+ protected String [] getArgv(boolean includeHadoopFlags,
+ int rowsPerStatement, int statementsPerTx, String... additionalArgv) {
+
+ String [] subArgv = newStrArray(additionalArgv, "--direct",
+ "--username", NetezzaTestUtils.getNZUser(), "--password",
+ NetezzaTestUtils.getNZPassword());
+ return super.getArgv(includeHadoopFlags, rowsPerStatement,
+ statementsPerTx, subArgv);
+ }
+
+
+
+ /**
+ * Create the table definition to export to, removing any prior table. By
+ * specifying ColumnGenerator arguments, you can add extra columns to the
+ * table of arbitrary type.
+ */
+ @Override
+ public void createTable(ColumnGenerator... extraColumns) throws SQLException {
+ PreparedStatement statement = conn.prepareStatement(
+ getDropTableStatement(getTableName()), ResultSet.TYPE_FORWARD_ONLY,
+ ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } catch (SQLException sqle) {
+ conn.rollback();
+ } finally {
+ statement.close();
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE TABLE ");
+ sb.append(getTableName());
+ sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+ int colNum = 0;
+ for (ColumnGenerator gen : extraColumns) {
+ sb.append(", " + forIdx(colNum++) + " " + gen.getType());
+ }
+ sb.append(")");
+
+ statement = conn.prepareStatement(sb.toString(),
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+ }
+ /**
+ * Test an authenticated export using netezza external table import.
+ */
+ public void testAuthExport() throws IOException, SQLException {
+ SqoopOptions options = new SqoopOptions(
+ NetezzaTestUtils.getNZConnectString(),
+ getTableName());
+ options.setUsername(NetezzaTestUtils.getNZUser());
+ options.setPassword(NetezzaTestUtils.getNZPassword());
+
+ manager = new DirectNetezzaManager(options);
+
+ Connection connection = null;
+ Statement st = null;
+
+ String tableName = getTableName();
+
+ try {
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ st = connection.createStatement();
+
+ // create a target database table.
+ try {
+ st.executeUpdate("DROP TABLE " + tableName);
+ } catch(SQLException sqle) {
+ LOG.info("Ignoring exception from DROP TABLE : " + sqle.getMessage());
+ connection.rollback();
+ }
+
+ LOG.info("Creating table " + tableName);
+
+ st.executeUpdate("CREATE TABLE " + tableName + " ("
+ + "id INT NOT NULL PRIMARY KEY, "
+ + "msg VARCHAR(24) NOT NULL)");
+
+ connection.commit();
+ LOG.info("Created table " + tableName);
+
+ // Write a file containing a record to export.
+ Path tablePath = getTablePath();
+ Path filePath = new Path(tablePath, "datafile");
+ Configuration conf = new Configuration();
+
+ FileSystem fs = FileSystem.get(conf);
+ fs.mkdirs(tablePath);
+ OutputStream os = fs.create(filePath);
+ BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+ w.write(getRecordLine(0));
+ w.write(getRecordLine(1));
+ w.write(getRecordLine(2));
+ w.close();
+ os.close();
+
+ // run the export and verify that the results are good.
+ runExport(getArgv(true, 10, 10,
+ "--username", NetezzaTestUtils.getNZUser(),
+ "--password", NetezzaTestUtils.getNZPassword(),
+ "--connect", NetezzaTestUtils.getNZConnectString()));
+ verifyExport(3, connection);
+ } catch (SQLException sqlE) {
+ LOG.error("Encountered SQL Exception: " + sqlE);
+ sqlE.printStackTrace();
+ fail("SQLException when accessing target table. " + sqlE);
+ } finally {
+ try {
+ if (null != st) {
+ st.close();
+ }
+ } catch (SQLException sqlE) {
+ LOG.warn("Got SQLException when closing connection: " + sqlE);
+ }
+ }
+ }
+
+
+ @Override
+ public void testMultiMapTextExportWithStaging()
+ throws IOException, SQLException {
+ // disable this test as staging is not supported in direct mode
+ }
+
+ @Override
+ public void testMultiTransactionWithStaging()
+ throws IOException, SQLException {
+ // disable this test as staging is not supported in direct mode
+ }
+
+ @Override
+ public void testColumnsExport()
+ throws IOException, SQLException {
+ // disable this test as it is not supported in direct mode
+ }
+
+ @Override
+ public void testSequenceFileExport()
+ throws IOException, SQLException {
+ // disable this test as it is not supported in direct mode
+ }
+}