You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/07/04 03:38:53 UTC

git commit: SQOOP-999: Support bulk load from HDFS to PostgreSQL using COPY ... FROM

Updated Branches:
  refs/heads/trunk a2a02076a -> fb29b8f9f


SQOOP-999: Support bulk load from HDFS to PostgreSQL using COPY ... FROM

(Masatake Iwasaki via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/fb29b8f9
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/fb29b8f9
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/fb29b8f9

Branch: refs/heads/trunk
Commit: fb29b8f9fcd45c98857fe44cfc3fe294f2fc6f84
Parents: a2a0207
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Jul 3 18:38:14 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Jul 3 18:38:14 2013 -0700

----------------------------------------------------------------------
 ivy.xml                                         |   3 +
 ivy/libraries.properties                        |   1 +
 .../sqoop/manager/DirectPostgresqlManager.java  |  21 ++-
 .../postgresql/PostgreSQLCopyExportJob.java     | 110 +++++++++++++
 .../postgresql/PostgreSQLCopyExportMapper.java  | 160 +++++++++++++++++++
 .../DirectPostgreSQLExportManualTest.java       | 160 +++++++++++++++++++
 6 files changed, 453 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 750adfc..63fdc80 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -182,6 +182,9 @@ under the License.
       <artifact name="hcatalog-core" type="jar"/>
     </dependency>
 
+    <dependency org="org.postgresql" name="postgresql"
+      rev="${postgresql.version}" conf="common->default" />
+
     <exclude org="org.apache.hadoop" module="avro"/>
     <exclude org="commons-daemon" module="commons-daemon" />
     <exclude type="pom" />

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/ivy/libraries.properties
----------------------------------------------------------------------
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 430d554..df1a08f 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -42,3 +42,4 @@ mvn.version=2.0.10
 
 rats-lib.version=0.5.1
 
+postgresql.version=9.2-1003-jdbc4

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
index c085218..8d4a097 100644
--- a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
+++ b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
@@ -35,21 +35,27 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.sqoop.cli.RelatedOptions;
+import org.apache.sqoop.mapreduce.ExportInputFormat;
+import org.apache.sqoop.mapreduce.postgresql.PostgreSQLCopyExportJob;
 import org.apache.sqoop.util.PostgreSQLUtils;
+import org.apache.sqoop.util.SubstitutionUtils;
 
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.io.SplittableBufferedWriter;
+import com.cloudera.sqoop.manager.ExportJobContext;
 import com.cloudera.sqoop.util.AsyncSink;
 import com.cloudera.sqoop.util.DirectImportUtils;
 import com.cloudera.sqoop.util.ErrorableAsyncSink;
 import com.cloudera.sqoop.util.ErrorableThread;
+import com.cloudera.sqoop.util.ExportException;
 import com.cloudera.sqoop.util.Executor;
 import com.cloudera.sqoop.util.ImportException;
 import com.cloudera.sqoop.util.JdbcUrl;
 import com.cloudera.sqoop.util.LoggingAsyncSink;
 import com.cloudera.sqoop.util.PerfCounters;
-import org.apache.sqoop.util.SubstitutionUtils;
+
 
 /**
  * Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
@@ -532,7 +538,7 @@ public class DirectPostgresqlManager
 
   @Override
   public boolean supportsStagingForExport() {
-    return false;
+    return true;
   }
   // CHECKSTYLE:ON
 
@@ -569,4 +575,15 @@ public class DirectPostgresqlManager
 
     return extraOptions;
   }
+
+  public void exportTable(ExportJobContext context)
+    throws IOException, ExportException {
+    context.setConnManager(this);
+    PostgreSQLCopyExportJob job =
+      new PostgreSQLCopyExportJob(context,
+                                  null,
+                                  ExportInputFormat.class,
+                                  NullOutputFormat.class);
+    job.runExport();
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java
new file mode 100644
index 0000000..483949f
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.postgresql;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.mapreduce.JdbcExportJob;
+
+
+
+/**
+ * Run an export using PostgreSQL JDBC Copy API.
+ */
+public class PostgreSQLCopyExportJob extends JdbcExportJob {
+  public static final Log LOG =
+    LogFactory.getLog(PostgreSQLCopyExportJob.class.getName());
+
+  public PostgreSQLCopyExportJob(final ExportJobContext context) {
+    super(context);
+  }
+
+  public PostgreSQLCopyExportJob(final ExportJobContext ctxt,
+      final Class<? extends Mapper> mapperClass,
+      final Class<? extends InputFormat> inputFormatClass,
+      final Class<? extends OutputFormat> outputFormatClass) {
+    super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+  }
+
+  @Override
+  protected Class<? extends Mapper> getMapperClass() {
+    return PostgreSQLCopyExportMapper.class;
+  }
+
+  @Override
+  protected void configureMapper(Job job, String tableName,
+      String tableClassName) throws ClassNotFoundException, IOException {
+    if (isHCatJob) {
+      throw new IOException("Sqoop-HCatalog Integration is not supported.");
+    }
+    switch (getInputFileType()) {
+      case AVRO_DATA_FILE:
+        throw new IOException("Avro data file is not supported.");
+      case SEQUENCE_FILE:
+      case UNKNOWN:
+      default:
+        job.setMapperClass(getMapperClass());
+    }
+
+    // Concurrent writes of the same records would be problematic.
+    ConfigurationHelper.setJobMapSpeculativeExecution(job, false);
+    job.setMapOutputKeyClass(NullWritable.class);
+    job.setMapOutputValueClass(NullWritable.class);
+  }
+
+  protected void propagateOptionsToJob(Job job) {
+    super.propagateOptionsToJob(job);
+    SqoopOptions opts = context.getOptions();
+    Configuration conf = job.getConfiguration();
+    if (opts.getNullStringValue() != null) {
+      conf.set("postgresql.null.string", opts.getNullStringValue());
+    }
+    setDelimiter("postgresql.input.field.delim",
+                 opts.getInputFieldDelim(), conf);
+    setDelimiter("postgresql.input.record.delim",
+                 opts.getInputRecordDelim(), conf);
+    setDelimiter("postgresql.input.enclosedby",
+                 opts.getInputEnclosedBy(), conf);
+    setDelimiter("postgresql.input.escapedby",
+                 opts.getInputEscapedBy(), conf);
+    conf.setBoolean("postgresql.input.encloserequired",
+                    opts.isInputEncloseRequired());
+  }
+
+  private void setDelimiter(String prop, char val, Configuration conf) {
+    switch (val) {
+    case DelimiterSet.NULL_CHAR:
+      break;
+    case '\t':
+    default:
+      conf.set(prop, String.valueOf(val));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java
new file mode 100644
index 0000000..d10cadb
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.postgresql;
+
+import com.cloudera.sqoop.lib.DelimiterSet;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.LoggingUtils;
+import org.postgresql.PGConnection;
+import org.postgresql.copy.CopyManager;
+import org.postgresql.copy.CopyIn;
+
+
+/**
+ * Mapper that export rows from HDFS to a PostgreSQL database at high speed
+ * with PostgreSQL Copy API.
+ *
+ * map() methods read from SequenceFiles (containing existing SqoopRecords)
+ * or text files (containing delimited lines)
+ * and deliver these results to the CopyIn object of PostgreSQL JDBC.
+ */
+public class PostgreSQLCopyExportMapper
+    extends AutoProgressMapper<LongWritable, Writable,
+                               NullWritable, NullWritable> {
+  public static final Log LOG =
+    LogFactory.getLog(PostgreSQLCopyExportMapper.class.getName());
+
+  private Configuration conf;
+  private DBConfiguration dbConf;
+  private Connection conn = null;
+  private CopyIn copyin = null;
+  private StringBuilder line = new StringBuilder();
+  private DelimiterSet delimiters =
+    new DelimiterSet(',', '\n',
+                     DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR, false);
+
+  public PostgreSQLCopyExportMapper() {
+  }
+
+  @Override
+  protected void setup(Context context)
+    throws IOException, InterruptedException {
+
+    super.setup(context);
+    conf = context.getConfiguration();
+    dbConf = new DBConfiguration(conf);
+    CopyManager cm = null;
+    try {
+      conn = dbConf.getConnection();
+      cm = ((PGConnection)conn).getCopyAPI();
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Unable to load JDBC driver class", ex);
+      throw new IOException(ex);
+    } catch (SQLException ex) {
+      LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex);
+      throw new IOException(ex);
+    }
+    try {
+      StringBuilder sql = new StringBuilder();
+      sql.append("COPY ");
+      sql.append(dbConf.getOutputTableName());
+      sql.append(" FROM STDIN WITH (");
+      sql.append(" ENCODING 'UTF-8' ");
+      sql.append(", FORMAT csv ");
+      sql.append(", DELIMITER ");
+      sql.append("'");
+      sql.append(conf.get("postgresql.input.field.delim", ","));
+      sql.append("'");
+      sql.append(", QUOTE ");
+      sql.append("'");
+      sql.append(conf.get("postgresql.input.enclosedby", "\""));
+      sql.append("'");
+      sql.append(", ESCAPE ");
+      sql.append("'");
+      sql.append(conf.get("postgresql.input.escapedby", "\""));
+      sql.append("'");
+      if (conf.get("postgresql.null.string") != null) {
+        sql.append(", NULL ");
+        sql.append("'");
+        sql.append(conf.get("postgresql.null.string"));
+        sql.append("'");
+      }
+      sql.append(")");
+      LOG.debug("Starting export with copy: " + sql);
+      copyin = cm.copyIn(sql.toString());
+    } catch (SQLException ex) {
+      LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex);
+      close();
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  public void map(LongWritable key, Writable value, Context context)
+    throws IOException, InterruptedException {
+    line.setLength(0);
+    line.append(value.toString());
+    if (value instanceof Text) {
+      line.append(System.getProperty("line.separator"));
+    }
+    try {
+      byte[]data = line.toString().getBytes("UTF-8");
+      copyin.writeToCopy(data, 0, data.length);
+    } catch (SQLException ex) {
+      LoggingUtils.logAll(LOG, "Unable to execute copy", ex);
+      close();
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  protected void cleanup(Context context)
+    throws IOException, InterruptedException {
+    try {
+      copyin.endCopy();
+    } catch (SQLException ex) {
+      LoggingUtils.logAll(LOG, "Unable to finalize copy", ex);
+      throw new IOException(ex);
+    }
+    close();
+  }
+
+  void close() throws IOException {
+    if (conn != null) {
+      try {
+        conn.close();
+      } catch (SQLException ex) {
+        LoggingUtils.logAll(LOG, "Unable to close connection", ex);
+        throw new IOException(ex);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/src/test/com/cloudera/sqoop/manager/DirectPostgreSQLExportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/DirectPostgreSQLExportManualTest.java b/src/test/com/cloudera/sqoop/manager/DirectPostgreSQLExportManualTest.java
new file mode 100644
index 0000000..52095ef
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/manager/DirectPostgreSQLExportManualTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import com.cloudera.sqoop.TestExport;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+
+
+/**
+ * Test the DirectPostgresqlManager implementations.
+ * DirectPostgresqlManager uses JDBC driver to facilitate it.
+ *
+ * Since this requires a Postgresql installation on your local machine to use,
+ * this class is named in such a way that Hadoop's default QA process does not
+ * run it.
+ *
+ * You need to run this manually with
+ * -Dtestcase=DirectPostgreSQLExportManualTest.
+ *
+ * You need to put Postgresql's JDBC driver library into lib dir.
+ *
+ * You need to create a sqooptest superuser and database and tablespace,
+ *
+ * $ sudo -u postgres createuser -U postgres -s sqooptest
+ * $ sudo -u postgres createdb -U sqooptest sqooptest
+ * $ psql -U sqooptest sqooptest
+ *
+ */
+public class DirectPostgreSQLExportManualTest extends TestExport {
+
+  public static final Log LOG =
+      LogFactory.getLog(DirectPostgreSQLExportManualTest.class.getName());
+  private DBConfiguration dbConf;
+
+  static final String HOST_URL =
+    System.getProperty("sqoop.test.postgresql.connectstring.host_url",
+                       "jdbc:postgresql://localhost/");
+  static final String DATABASE =
+    System.getProperty("sqoop.test.postgresql.database", "sqooptest");
+  static final String USERNAME =
+    System.getProperty("sqoop.test.postgresql.username", "sqooptest");
+  static final String CONNECT_STRING = HOST_URL + DATABASE;
+
+  public DirectPostgreSQLExportManualTest() {
+    JobConf conf = new JobConf(getConf());
+    DBConfiguration.configureDB(conf,
+                                "org.postgresql.Driver",
+                                getConnectString(),
+                                getUserName(),
+                                (String) null, (Integer) null);
+    dbConf = new DBConfiguration(conf);
+  }
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+  @Override
+  protected String getConnectString() {
+    return CONNECT_STRING;
+  }
+
+  protected String getUserName() {
+    return USERNAME;
+  }
+
+  @Override
+  protected String getTablePrefix() {
+    return super.getTablePrefix().toLowerCase();
+  }
+
+  @Override
+  protected String getTableName() {
+    return super.getTableName().toLowerCase();
+  }
+
+  @Override
+  public String getStagingTableName() {
+    return super.getStagingTableName().toLowerCase();
+  }
+
+  @Override
+  protected Connection getConnection() {
+    try {
+      Connection conn = dbConf.getConnection();
+      conn.setAutoCommit(false);
+      PreparedStatement stmt =
+          conn.prepareStatement("SET extra_float_digits TO 0");
+      stmt.executeUpdate();
+      conn.commit();
+      return conn;
+    } catch (SQLException sqlE) {
+      LOG.error("Could not get connection to test server: " + sqlE);
+      return null;
+    } catch (ClassNotFoundException cnfE) {
+      LOG.error("Could not find driver class: " + cnfE);
+      return null;
+    }
+  }
+
+  @Override
+  protected String getDropTableStatement(String tableName) {
+    return "DROP TABLE IF EXISTS " + tableName;
+  }
+
+  @Override
+  protected String[] getArgv(boolean includeHadoopFlags,
+                             int rowsPerStatement,
+                             int statementsPerTx,
+                             String... additionalArgv) {
+    ArrayList<String> args =
+        new ArrayList<String>(Arrays.asList(additionalArgv));
+    args.add("--username");
+    args.add(getUserName());
+    args.add("--direct");
+    return super.getArgv(includeHadoopFlags,
+                         rowsPerStatement,
+                         statementsPerTx,
+                         args.toArray(new String[0]));
+  }
+
+  @Override
+  protected String [] getCodeGenArgv(String... extraArgs) {
+    ArrayList<String> args = new ArrayList<String>(Arrays.asList(extraArgs));
+    args.add("--username");
+    args.add(getUserName());
+    return super.getCodeGenArgv(args.toArray(new String[0]));
+  }
+
+  @Override
+  public void testColumnsExport() throws IOException, SQLException {
+    // Direct export does not support --columns option.
+  }
+}