You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/07/04 03:38:53 UTC
git commit: SQOOP-999: Support bulk load from HDFS to PostgreSQL
using COPY ... FROM
Updated Branches:
refs/heads/trunk a2a02076a -> fb29b8f9f
SQOOP-999: Support bulk load from HDFS to PostgreSQL using COPY ... FROM
(Masatake Iwasaki via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/fb29b8f9
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/fb29b8f9
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/fb29b8f9
Branch: refs/heads/trunk
Commit: fb29b8f9fcd45c98857fe44cfc3fe294f2fc6f84
Parents: a2a0207
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Jul 3 18:38:14 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Jul 3 18:38:14 2013 -0700
----------------------------------------------------------------------
ivy.xml | 3 +
ivy/libraries.properties | 1 +
.../sqoop/manager/DirectPostgresqlManager.java | 21 ++-
.../postgresql/PostgreSQLCopyExportJob.java | 110 +++++++++++++
.../postgresql/PostgreSQLCopyExportMapper.java | 160 +++++++++++++++++++
.../DirectPostgreSQLExportManualTest.java | 160 +++++++++++++++++++
6 files changed, 453 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 750adfc..63fdc80 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -182,6 +182,9 @@ under the License.
<artifact name="hcatalog-core" type="jar"/>
</dependency>
+ <dependency org="org.postgresql" name="postgresql"
+ rev="${postgresql.version}" conf="common->default" />
+
<exclude org="org.apache.hadoop" module="avro"/>
<exclude org="commons-daemon" module="commons-daemon" />
<exclude type="pom" />
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/ivy/libraries.properties
----------------------------------------------------------------------
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 430d554..df1a08f 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -42,3 +42,4 @@ mvn.version=2.0.10
rats-lib.version=0.5.1
+postgresql.version=9.2-1003-jdbc4
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
index c085218..8d4a097 100644
--- a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
+++ b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
@@ -35,21 +35,27 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.sqoop.cli.RelatedOptions;
+import org.apache.sqoop.mapreduce.ExportInputFormat;
+import org.apache.sqoop.mapreduce.postgresql.PostgreSQLCopyExportJob;
import org.apache.sqoop.util.PostgreSQLUtils;
+import org.apache.sqoop.util.SubstitutionUtils;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.io.SplittableBufferedWriter;
+import com.cloudera.sqoop.manager.ExportJobContext;
import com.cloudera.sqoop.util.AsyncSink;
import com.cloudera.sqoop.util.DirectImportUtils;
import com.cloudera.sqoop.util.ErrorableAsyncSink;
import com.cloudera.sqoop.util.ErrorableThread;
+import com.cloudera.sqoop.util.ExportException;
import com.cloudera.sqoop.util.Executor;
import com.cloudera.sqoop.util.ImportException;
import com.cloudera.sqoop.util.JdbcUrl;
import com.cloudera.sqoop.util.LoggingAsyncSink;
import com.cloudera.sqoop.util.PerfCounters;
-import org.apache.sqoop.util.SubstitutionUtils;
+
/**
* Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
@@ -532,7 +538,7 @@ public class DirectPostgresqlManager
@Override
public boolean supportsStagingForExport() {
- return false;
+ return true;
}
// CHECKSTYLE:ON
@@ -569,4 +575,15 @@ public class DirectPostgresqlManager
return extraOptions;
}
+
+ public void exportTable(ExportJobContext context)
+ throws IOException, ExportException {
+ context.setConnManager(this);
+ PostgreSQLCopyExportJob job =
+ new PostgreSQLCopyExportJob(context,
+ null,
+ ExportInputFormat.class,
+ NullOutputFormat.class);
+ job.runExport();
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java
new file mode 100644
index 0000000..483949f
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.postgresql;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.mapreduce.JdbcExportJob;
+
+
+
+/**
+ * Run an export using PostgreSQL JDBC Copy API.
+ */
+public class PostgreSQLCopyExportJob extends JdbcExportJob {
+ public static final Log LOG =
+ LogFactory.getLog(PostgreSQLCopyExportJob.class.getName());
+
+ public PostgreSQLCopyExportJob(final ExportJobContext context) {
+ super(context);
+ }
+
+ public PostgreSQLCopyExportJob(final ExportJobContext ctxt,
+ final Class<? extends Mapper> mapperClass,
+ final Class<? extends InputFormat> inputFormatClass,
+ final Class<? extends OutputFormat> outputFormatClass) {
+ super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+ }
+
+ @Override
+ protected Class<? extends Mapper> getMapperClass() {
+ return PostgreSQLCopyExportMapper.class;
+ }
+
+ @Override
+ protected void configureMapper(Job job, String tableName,
+ String tableClassName) throws ClassNotFoundException, IOException {
+ if (isHCatJob) {
+ throw new IOException("Sqoop-HCatalog Integration is not supported.");
+ }
+ switch (getInputFileType()) {
+ case AVRO_DATA_FILE:
+ throw new IOException("Avro data file is not supported.");
+ case SEQUENCE_FILE:
+ case UNKNOWN:
+ default:
+ job.setMapperClass(getMapperClass());
+ }
+
+ // Concurrent writes of the same records would be problematic.
+ ConfigurationHelper.setJobMapSpeculativeExecution(job, false);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ }
+
+ protected void propagateOptionsToJob(Job job) {
+ super.propagateOptionsToJob(job);
+ SqoopOptions opts = context.getOptions();
+ Configuration conf = job.getConfiguration();
+ if (opts.getNullStringValue() != null) {
+ conf.set("postgresql.null.string", opts.getNullStringValue());
+ }
+ setDelimiter("postgresql.input.field.delim",
+ opts.getInputFieldDelim(), conf);
+ setDelimiter("postgresql.input.record.delim",
+ opts.getInputRecordDelim(), conf);
+ setDelimiter("postgresql.input.enclosedby",
+ opts.getInputEnclosedBy(), conf);
+ setDelimiter("postgresql.input.escapedby",
+ opts.getInputEscapedBy(), conf);
+ conf.setBoolean("postgresql.input.encloserequired",
+ opts.isInputEncloseRequired());
+ }
+
+ private void setDelimiter(String prop, char val, Configuration conf) {
+ switch (val) {
+ case DelimiterSet.NULL_CHAR:
+ break;
+ case '\t':
+ default:
+ conf.set(prop, String.valueOf(val));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java
new file mode 100644
index 0000000..d10cadb
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.postgresql;
+
+import com.cloudera.sqoop.lib.DelimiterSet;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.LoggingUtils;
+import org.postgresql.PGConnection;
+import org.postgresql.copy.CopyManager;
+import org.postgresql.copy.CopyIn;
+
+
+/**
+ * Mapper that export rows from HDFS to a PostgreSQL database at high speed
+ * with PostgreSQL Copy API.
+ *
+ * map() methods read from SequenceFiles (containing existing SqoopRecords)
+ * or text files (containing delimited lines)
+ * and deliver these results to the CopyIn object of PostgreSQL JDBC.
+ */
+public class PostgreSQLCopyExportMapper
+ extends AutoProgressMapper<LongWritable, Writable,
+ NullWritable, NullWritable> {
+ public static final Log LOG =
+ LogFactory.getLog(PostgreSQLCopyExportMapper.class.getName());
+
+ private Configuration conf;
+ private DBConfiguration dbConf;
+ private Connection conn = null;
+ private CopyIn copyin = null;
+ private StringBuilder line = new StringBuilder();
+ private DelimiterSet delimiters =
+ new DelimiterSet(',', '\n',
+ DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR, false);
+
+ public PostgreSQLCopyExportMapper() {
+ }
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+
+ super.setup(context);
+ conf = context.getConfiguration();
+ dbConf = new DBConfiguration(conf);
+ CopyManager cm = null;
+ try {
+ conn = dbConf.getConnection();
+ cm = ((PGConnection)conn).getCopyAPI();
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Unable to load JDBC driver class", ex);
+ throw new IOException(ex);
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex);
+ throw new IOException(ex);
+ }
+ try {
+ StringBuilder sql = new StringBuilder();
+ sql.append("COPY ");
+ sql.append(dbConf.getOutputTableName());
+ sql.append(" FROM STDIN WITH (");
+ sql.append(" ENCODING 'UTF-8' ");
+ sql.append(", FORMAT csv ");
+ sql.append(", DELIMITER ");
+ sql.append("'");
+ sql.append(conf.get("postgresql.input.field.delim", ","));
+ sql.append("'");
+ sql.append(", QUOTE ");
+ sql.append("'");
+ sql.append(conf.get("postgresql.input.enclosedby", "\""));
+ sql.append("'");
+ sql.append(", ESCAPE ");
+ sql.append("'");
+ sql.append(conf.get("postgresql.input.escapedby", "\""));
+ sql.append("'");
+ if (conf.get("postgresql.null.string") != null) {
+ sql.append(", NULL ");
+ sql.append("'");
+ sql.append(conf.get("postgresql.null.string"));
+ sql.append("'");
+ }
+ sql.append(")");
+ LOG.debug("Starting export with copy: " + sql);
+ copyin = cm.copyIn(sql.toString());
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex);
+ close();
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ public void map(LongWritable key, Writable value, Context context)
+ throws IOException, InterruptedException {
+ line.setLength(0);
+ line.append(value.toString());
+ if (value instanceof Text) {
+ line.append(System.getProperty("line.separator"));
+ }
+ try {
+ byte[]data = line.toString().getBytes("UTF-8");
+ copyin.writeToCopy(data, 0, data.length);
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to execute copy", ex);
+ close();
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ try {
+ copyin.endCopy();
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to finalize copy", ex);
+ throw new IOException(ex);
+ }
+ close();
+ }
+
+ void close() throws IOException {
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to close connection", ex);
+ throw new IOException(ex);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/src/test/com/cloudera/sqoop/manager/DirectPostgreSQLExportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/DirectPostgreSQLExportManualTest.java b/src/test/com/cloudera/sqoop/manager/DirectPostgreSQLExportManualTest.java
new file mode 100644
index 0000000..52095ef
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/manager/DirectPostgreSQLExportManualTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import com.cloudera.sqoop.TestExport;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+
+
+/**
+ * Test the DirectPostgresqlManager implementations.
+ * DirectPostgresqlManager uses JDBC driver to facilitate it.
+ *
+ * Since this requires a Postgresql installation on your local machine to use,
+ * this class is named in such a way that Hadoop's default QA process does not
+ * run it.
+ *
+ * You need to run this manually with
+ * -Dtestcase=DirectPostgreSQLExportManualTest.
+ *
+ * You need to put Postgresql's JDBC driver library into lib dir.
+ *
+ * You need to create a sqooptest superuser and database and tablespace,
+ *
+ * $ sudo -u postgres createuser -U postgres -s sqooptest
+ * $ sudo -u postgres createdb -U sqooptest sqooptest
+ * $ psql -U sqooptest sqooptest
+ *
+ */
+public class DirectPostgreSQLExportManualTest extends TestExport {
+
+ public static final Log LOG =
+ LogFactory.getLog(DirectPostgreSQLExportManualTest.class.getName());
+ private DBConfiguration dbConf;
+
+ static final String HOST_URL =
+ System.getProperty("sqoop.test.postgresql.connectstring.host_url",
+ "jdbc:postgresql://localhost/");
+ static final String DATABASE =
+ System.getProperty("sqoop.test.postgresql.database", "sqooptest");
+ static final String USERNAME =
+ System.getProperty("sqoop.test.postgresql.username", "sqooptest");
+ static final String CONNECT_STRING = HOST_URL + DATABASE;
+
+ public DirectPostgreSQLExportManualTest() {
+ JobConf conf = new JobConf(getConf());
+ DBConfiguration.configureDB(conf,
+ "org.postgresql.Driver",
+ getConnectString(),
+ getUserName(),
+ (String) null, (Integer) null);
+ dbConf = new DBConfiguration(conf);
+ }
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ @Override
+ protected String getConnectString() {
+ return CONNECT_STRING;
+ }
+
+ protected String getUserName() {
+ return USERNAME;
+ }
+
+ @Override
+ protected String getTablePrefix() {
+ return super.getTablePrefix().toLowerCase();
+ }
+
+ @Override
+ protected String getTableName() {
+ return super.getTableName().toLowerCase();
+ }
+
+ @Override
+ public String getStagingTableName() {
+ return super.getStagingTableName().toLowerCase();
+ }
+
+ @Override
+ protected Connection getConnection() {
+ try {
+ Connection conn = dbConf.getConnection();
+ conn.setAutoCommit(false);
+ PreparedStatement stmt =
+ conn.prepareStatement("SET extra_float_digits TO 0");
+ stmt.executeUpdate();
+ conn.commit();
+ return conn;
+ } catch (SQLException sqlE) {
+ LOG.error("Could not get connection to test server: " + sqlE);
+ return null;
+ } catch (ClassNotFoundException cnfE) {
+ LOG.error("Could not find driver class: " + cnfE);
+ return null;
+ }
+ }
+
+ @Override
+ protected String getDropTableStatement(String tableName) {
+ return "DROP TABLE IF EXISTS " + tableName;
+ }
+
+ @Override
+ protected String[] getArgv(boolean includeHadoopFlags,
+ int rowsPerStatement,
+ int statementsPerTx,
+ String... additionalArgv) {
+ ArrayList<String> args =
+ new ArrayList<String>(Arrays.asList(additionalArgv));
+ args.add("--username");
+ args.add(getUserName());
+ args.add("--direct");
+ return super.getArgv(includeHadoopFlags,
+ rowsPerStatement,
+ statementsPerTx,
+ args.toArray(new String[0]));
+ }
+
+ @Override
+ protected String [] getCodeGenArgv(String... extraArgs) {
+ ArrayList<String> args = new ArrayList<String>(Arrays.asList(extraArgs));
+ args.add("--username");
+ args.add(getUserName());
+ return super.getCodeGenArgv(args.toArray(new String[0]));
+ }
+
+ @Override
+ public void testColumnsExport() throws IOException, SQLException {
+ // Direct export does not support --columns option.
+ }
+}