You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/11/27 01:27:08 UTC
git commit: SQOOP-382: Connection parameters should be used on the
mapper
Updated Branches:
refs/heads/trunk 95098af0e -> 817195ebb
SQOOP-382: Connection parameters should be used on the mapper
(David Robson via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/817195eb
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/817195eb
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/817195eb
Branch: refs/heads/trunk
Commit: 817195ebb0bd54c9a81f1f1960858f0b495e63bd
Parents: 95098af
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Nov 26 16:26:18 2012 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Nov 26 16:26:18 2012 -0800
----------------------------------------------------------------------
.../sqoop/mapreduce/DataDrivenImportJob.java | 5 +-
.../org/apache/sqoop/mapreduce/JdbcExportJob.java | 6 +-
.../sqoop/mapreduce/JdbcUpdateExportJob.java | 6 +-
.../sqoop/mapreduce/JdbcUpsertExportJob.java | 6 +-
.../apache/sqoop/mapreduce/MySQLDumpImportJob.java | 5 +-
.../org/apache/sqoop/mapreduce/MySQLExportJob.java | 5 +-
.../sqoop/mapreduce/PGBulkloadExportJob.java | 6 +-
.../apache/sqoop/mapreduce/db/DBConfiguration.java | 161 ++++++++++++++-
.../sqoop/manager/PGBulkloadManagerManualTest.java | 2 +-
.../mapreduce/db/TestDataDrivenDBInputFormat.java | 2 +-
.../sqoop/mapreduce/db/TestDBConfiguration.java | 61 ++++++
11 files changed, 238 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index 9e5f102..ef1d363 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -150,11 +150,12 @@ public class DataDrivenImportJob extends ImportJobBase {
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(), options.getConnectString(),
- options.getFetchSize());
+ options.getFetchSize(), options.getConnectionParams());
} else {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(), options.getConnectString(),
- username, options.getPassword(), options.getFetchSize());
+ username, options.getPassword(), options.getFetchSize(),
+ options.getConnectionParams());
}
if (null != tableName) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index bd52f00..7c52110 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -115,12 +115,14 @@ public class JdbcExportJob extends ExportJobBase {
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
- options.getConnectString());
+ options.getConnectString(),
+ options.getConnectionParams());
} else {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
options.getConnectString(),
- username, options.getPassword());
+ username, options.getPassword(),
+ options.getConnectionParams());
}
String [] colNames = options.getColumns();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
index 21cb128..8fa420e 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
@@ -83,12 +83,14 @@ public class JdbcUpdateExportJob extends ExportJobBase {
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
- options.getConnectString());
+ options.getConnectString(),
+ options.getConnectionParams());
} else {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
options.getConnectString(),
- username, options.getPassword());
+ username, options.getPassword(),
+ options.getConnectionParams());
}
String [] colNames = options.getColumns();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
index c17b4bb..0a9bf7f 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
@@ -56,12 +56,14 @@ public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
- options.getConnectString());
+ options.getConnectString(),
+ options.getConnectionParams());
} else {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
options.getConnectString(),
- username, options.getPassword());
+ username, options.getPassword(),
+ options.getConnectionParams());
}
String [] colNames = options.getColumns();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java b/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java
index 634bd34..43fbec4 100644
--- a/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java
@@ -63,11 +63,12 @@ public class MySQLDumpImportJob extends ImportJobBase {
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(),
- mgr.getDriverClass(), options.getConnectString());
+ mgr.getDriverClass(), options.getConnectString(),
+ options.getConnectionParams());
} else {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(), options.getConnectString(), username,
- options.getPassword());
+ options.getPassword(), options.getConnectionParams());
}
String [] colNames = options.getColumns();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java b/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java
index 0523901..16bdd74 100644
--- a/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java
@@ -73,11 +73,12 @@ public class MySQLExportJob extends ExportJobBase {
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(),
- mgr.getDriverClass(), options.getConnectString());
+ mgr.getDriverClass(), options.getConnectString(),
+ options.getConnectionParams());
} else {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(), options.getConnectString(), username,
- options.getPassword());
+ options.getPassword(), options.getConnectionParams());
}
String [] colNames = options.getColumns();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
index f3f094b..cc60233 100644
--- a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
@@ -72,13 +72,15 @@ public class PGBulkloadExportJob extends ExportJobBase {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
options.getConnectString(),
- options.getFetchSize());
+ options.getFetchSize(),
+ options.getConnectionParams());
} else {
DBConfiguration.configureDB(job.getConfiguration(),
mgr.getDriverClass(),
options.getConnectString(),
username, options.getPassword(),
- options.getFetchSize());
+ options.getFetchSize(),
+ options.getConnectionParams());
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java b/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java
index 22993df..d270bc8 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java
@@ -20,8 +20,15 @@ package org.apache.sqoop.mapreduce.db;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.text.StrTokenizer;
import org.apache.sqoop.mapreduce.DBWritable;
import com.cloudera.sqoop.mapreduce.db.DBInputFormat.NullDBWritable;
@@ -55,6 +62,10 @@ public class DBConfiguration {
/** Password to access the database. */
public static final String PASSWORD_PROPERTY = "mapreduce.jdbc.password";
+ /** JDBC connection parameters. */
+ public static final String CONNECTION_PARAMS_PROPERTY =
+ "mapreduce.jdbc.params";
+
/** Fetch size. */
public static final String FETCH_SIZE = "mapreduce.jdbc.fetchsize";
@@ -109,9 +120,11 @@ public class DBConfiguration {
* @param userName DB access username
* @param passwd DB access passwd
* @param fetchSize DB fetch size
+ * @param connectionParams JDBC connection parameters
*/
public static void configureDB(Configuration conf, String driverClass,
- String dbUrl, String userName, String passwd, Integer fetchSize) {
+ String dbUrl, String userName, String passwd, Integer fetchSize,
+ Properties connectionParams) {
conf.set(DRIVER_CLASS_PROPERTY, driverClass);
conf.set(URL_PROPERTY, dbUrl);
@@ -124,6 +137,67 @@ public class DBConfiguration {
if (fetchSize != null) {
conf.setInt(FETCH_SIZE, fetchSize);
}
+ if (connectionParams != null) {
+ conf.set(CONNECTION_PARAMS_PROPERTY,
+ propertiesToString(connectionParams));
+ }
+ }
+
+ /**
+ * Sets the DB access related fields in the JobConf.
+ * @param job the job
+ * @param driverClass JDBC Driver class name
+ * @param dbUrl JDBC DB access URL
+ * @param fetchSize DB fetch size
+ * @param connectionParams JDBC connection parameters
+ */
+ public static void configureDB(Configuration job, String driverClass,
+ String dbUrl, Integer fetchSize, Properties connectionParams) {
+ configureDB(job, driverClass, dbUrl, null, null, fetchSize,
+ connectionParams);
+ }
+
+ /**
+ * Sets the DB access related fields in the {@link Configuration}.
+ * @param conf the configuration
+ * @param driverClass JDBC Driver class name
+ * @param dbUrl JDBC DB access URL
+ * @param userName DB access username
+ * @param passwd DB access passwd
+ * @param connectionParams JDBC connection parameters
+ */
+ public static void configureDB(Configuration conf, String driverClass,
+ String dbUrl, String userName, String passwd,
+ Properties connectionParams) {
+ configureDB(conf, driverClass, dbUrl, userName, passwd, null,
+ connectionParams);
+ }
+
+ /**
+ * Sets the DB access related fields in the JobConf.
+ * @param job the job
+ * @param driverClass JDBC Driver class name
+ * @param dbUrl JDBC DB access URL.
+ * @param connectionParams JDBC connection parameters
+ */
+ public static void configureDB(Configuration job, String driverClass,
+ String dbUrl, Properties connectionParams) {
+ configureDB(job, driverClass, dbUrl, null, connectionParams);
+ }
+
+ /**
+ * Sets the DB access related fields in the {@link Configuration}.
+ * @param conf the configuration
+ * @param driverClass JDBC Driver class name
+ * @param dbUrl JDBC DB access URL
+ * @param userName DB access username
+ * @param passwd DB access passwd
+ * @param fetchSize DB fetch size
+ */
+ public static void configureDB(Configuration conf, String driverClass,
+ String dbUrl, String userName, String passwd, Integer fetchSize) {
+ configureDB(conf, driverClass, dbUrl, userName, passwd, fetchSize,
+ (Properties) null);
}
/**
@@ -135,7 +209,7 @@ public class DBConfiguration {
*/
public static void configureDB(Configuration job, String driverClass,
String dbUrl, Integer fetchSize) {
- configureDB(job, driverClass, dbUrl, null, null, fetchSize);
+ configureDB(job, driverClass, dbUrl, fetchSize, (Properties) null);
}
/**
@@ -148,7 +222,7 @@ public class DBConfiguration {
*/
public static void configureDB(Configuration conf, String driverClass,
String dbUrl, String userName, String passwd) {
- configureDB(conf, driverClass, dbUrl, userName, passwd, null);
+ configureDB(conf, driverClass, dbUrl, userName, passwd, (Properties) null);
}
/**
@@ -159,7 +233,7 @@ public class DBConfiguration {
*/
public static void configureDB(Configuration job, String driverClass,
String dbUrl) {
- configureDB(job, driverClass, dbUrl, null);
+ configureDB(job, driverClass, dbUrl, (Properties) null);
}
@@ -174,18 +248,38 @@ public class DBConfiguration {
* @throws SQLException */
public Connection getConnection()
throws ClassNotFoundException, SQLException {
+ Connection connection;
Class.forName(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
- if (conf.get(DBConfiguration.USERNAME_PROPERTY) == null) {
- return DriverManager.getConnection(
- conf.get(DBConfiguration.URL_PROPERTY));
+ String username = conf.get(DBConfiguration.USERNAME_PROPERTY);
+ String password = conf.get(DBConfiguration.PASSWORD_PROPERTY);
+ String connectString = conf.get(DBConfiguration.URL_PROPERTY);
+ String connectionParamsStr =
+ conf.get(DBConfiguration.CONNECTION_PARAMS_PROPERTY);
+ Properties connectionParams = propertiesFromString(connectionParamsStr);
+
+ if (connectionParams != null && connectionParams.size() > 0) {
+ Properties props = new Properties();
+ if (username != null) {
+ props.put("user", username);
+ }
+
+ if (password != null) {
+ props.put("password", password);
+ }
+
+ props.putAll(connectionParams);
+ connection = DriverManager.getConnection(connectString, props);
} else {
- return DriverManager.getConnection(
- conf.get(DBConfiguration.URL_PROPERTY),
- conf.get(DBConfiguration.USERNAME_PROPERTY),
- conf.get(DBConfiguration.PASSWORD_PROPERTY));
+ if (username == null) {
+ connection = DriverManager.getConnection(connectString);
+ } else {
+ connection = DriverManager.getConnection(
+ connectString, username, password);
+ }
}
+ return connection;
}
public Configuration getConf() {
@@ -306,4 +400,49 @@ public class DBConfiguration {
return conf.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0);
}
+ /**
+ * Converts connection properties to a String to be passed to the mappers.
+ * @param properties JDBC connection parameters
+ * @return String to be passed to configuration
+ */
+ protected static String propertiesToString(Properties properties) {
+ List<String> propertiesList = new ArrayList<String>(properties.size());
+ for(Entry<Object, Object> property : properties.entrySet()) {
+ String key = StringEscapeUtils.escapeCsv(property.getKey().toString());
+ if (key.equals(property.getKey().toString()) && key.contains("=")) {
+ key = "\"" + key + "\"";
+ }
+ String val = StringEscapeUtils.escapeCsv(property.getValue().toString());
+ if (val.equals(property.getValue().toString()) && val.contains("=")) {
+ val = "\"" + val + "\"";
+ }
+ propertiesList.add(StringEscapeUtils.escapeCsv(key + "=" + val));
+ }
+ return StringUtils.join(propertiesList, ',');
+ }
+
+ /**
+ * Converts a String back to connection parameters.
+ * @param input String from configuration
+ * @return JDBC connection parameters
+ */
+ protected static Properties propertiesFromString(String input) {
+ if (input != null && !input.isEmpty()) {
+ Properties result = new Properties();
+ StrTokenizer propertyTokenizer = StrTokenizer.getCSVInstance(input);
+ StrTokenizer valueTokenizer = StrTokenizer.getCSVInstance();
+ valueTokenizer.setDelimiterChar('=');
+ while (propertyTokenizer.hasNext()){
+ valueTokenizer.reset(propertyTokenizer.nextToken());
+ String[] values = valueTokenizer.getTokenArray();
+ if (values.length==2) {
+ result.put(values[0], values[1]);
+ }
+ }
+ return result;
+ } else {
+ return null;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
index fff35dc..0403614 100644
--- a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
+++ b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
@@ -72,7 +72,7 @@ public class PGBulkloadManagerManualTest extends TestExport {
"org.postgresql.Driver",
getConnectString(),
getUserName(),
- null, null);
+ (String) null, (Integer) null);
dbConf = new DBConfiguration(conf);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java b/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java
index 6b4214c..fed22b8 100644
--- a/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java
+++ b/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java
@@ -206,7 +206,7 @@ public class TestDataDrivenDBInputFormat extends TestCase {
job.getConfiguration().setInt("mapreduce.map.tasks", 2);
FileOutputFormat.setOutputPath(job, new Path(OUT_DIR));
DBConfiguration.configureDB(job.getConfiguration(), DRIVER_CLASS,
- DB_URL, null, null);
+ DB_URL, (String) null, (String) null);
DataDrivenDBInputFormat.setInput(job, DateCol.class, DATE_TABLE, null,
COL, COL);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/test/org/apache/sqoop/mapreduce/db/TestDBConfiguration.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/mapreduce/db/TestDBConfiguration.java b/src/test/org/apache/sqoop/mapreduce/db/TestDBConfiguration.java
new file mode 100644
index 0000000..cad1004
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/db/TestDBConfiguration.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db;
+
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+/**
+ * Test aspects of DBConfiguration.
+ */
+public class TestDBConfiguration extends TestCase {
+
+ public void testPropertiesToString() {
+ Properties connParams = new Properties();
+ connParams.setProperty("a", "value-a");
+ connParams.setProperty("b", "value-b");
+ connParams.setProperty("a.b", "value-a.b");
+ connParams.setProperty("a.b.c", "value-a.b.c");
+ connParams.setProperty("aaaaaaaaaa.bbbbbbb.cccccccc", "value-abc");
+ String result = DBConfiguration.propertiesToString(connParams);
+ Properties resultParams = DBConfiguration.propertiesFromString(result);
+ assertEquals("connection params don't match", connParams, resultParams);
+
+ connParams = new Properties();
+ connParams.put("conn.timeout", "3000");
+ connParams.put("conn.buffer_size", "256");
+ connParams.put("conn.dummy", "dummy");
+ connParams.put("conn.foo", "bar");
+ result = DBConfiguration.propertiesToString(connParams);
+ resultParams = DBConfiguration.propertiesFromString(result);
+ assertEquals("connection params don't match", connParams, resultParams);
+
+ connParams = new Properties();
+ connParams.put("user", "ABC");
+ connParams.put("password", "complex\"pass,word\\123");
+ connParams.put("complex\"param,\\name", "dummy");
+ connParams.put("conn.buffer=size", "256");
+ connParams.put("jdbc.property", "a=b");
+ result = DBConfiguration.propertiesToString(connParams);
+ resultParams = DBConfiguration.propertiesFromString(result);
+ assertEquals("connection params don't match", connParams, resultParams);
+ }
+
+}