You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ch...@apache.org on 2013/01/31 00:24:41 UTC
[1/2] git commit: SQOOP-842: Put partition to template in Extractor
as well
SQOOP-842: Put partition to template in Extractor as well
(Jarcec Cecho via Cheolsoo Park)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/92062d53
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/92062d53
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/92062d53
Branch: refs/heads/sqoop2
Commit: 92062d5343e6318fe703b06e30c52920938818b0
Parents: 03408d5
Author: Cheolsoo Park <ch...@apache.org>
Authored: Wed Jan 30 15:13:18 2013 -0800
Committer: Cheolsoo Park <ch...@apache.org>
Committed: Wed Jan 30 15:13:18 2013 -0800
----------------------------------------------------------------------
.../connector/jdbc/GenericJdbcImportExtractor.java | 6 +++---
.../sqoop/job/etl/HdfsSequenceExportExtractor.java | 15 ++++++++-------
.../sqoop/job/etl/HdfsTextExportExtractor.java | 8 +++++---
.../java/org/apache/sqoop/job/TestHdfsLoad.java | 2 +-
.../java/org/apache/sqoop/job/TestMapReduce.java | 2 +-
.../java/org/apache/sqoop/job/etl/Extractor.java | 6 +++---
6 files changed, 21 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
index f4389a3..9db3328 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
@@ -30,13 +30,13 @@ import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.io.DataWriter;
-public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration> {
+public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration, GenericJdbcImportPartition> {
public static final Logger LOG = Logger.getLogger(GenericJdbcImportExtractor.class);
private long rowsRead = 0;
@Override
- public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, Partition partition, DataWriter writer) {
+ public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, GenericJdbcImportPartition partition, DataWriter writer) {
String driver = connection.connection.jdbcDriver;
String url = connection.connection.connectionString;
String username = connection.connection.username;
@@ -44,7 +44,7 @@ public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguratio
GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL);
- String conditions = ((GenericJdbcImportPartition)partition).getConditions();
+ String conditions = partition.getConditions();
query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
LOG.info("Using query: " + query);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
index 3a04e59..45b6166 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
@@ -27,12 +27,14 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
+import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataWriter;
-public class HdfsSequenceExportExtractor extends Extractor {
+public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
public static final Log LOG =
LogFactory.getLog(HdfsSequenceExportExtractor.class.getName());
@@ -47,19 +49,18 @@ public class HdfsSequenceExportExtractor extends Extractor {
}
@Override
- public void run(ImmutableContext context, Object connectionConfiguration,
- Object jobConfiguration, Partition partition, DataWriter writer) {
+ public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration,
+ ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) {
writer.setFieldDelimiter(fieldDelimiter);
conf = ((PrefixContext)context).getConfiguration();
datawriter = writer;
try {
- HdfsExportPartition p = (HdfsExportPartition)partition;
- LOG.info("Working on partition: " + p);
- int numFiles = p.getNumberOfFiles();
+ LOG.info("Working on partition: " + partition);
+ int numFiles = partition.getNumberOfFiles();
for (int i=0; i<numFiles; i++) {
- extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
+ extractFile(partition.getFile(i), partition.getOffset(i), partition.getLength(i));
}
} catch (IOException e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
index e00d428..ed30c91 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
@@ -33,12 +33,14 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.LineReader;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
+import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.DataWriter;
-public class HdfsTextExportExtractor extends Extractor {
+public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
public static final Log LOG =
LogFactory.getLog(HdfsTextExportExtractor.class.getName());
@@ -53,8 +55,8 @@ public class HdfsTextExportExtractor extends Extractor {
}
@Override
- public void run(ImmutableContext context, Object connectionConfiguration,
- Object jobConfiguration, Partition partition, DataWriter writer) {
+ public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration,
+ ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) {
writer.setFieldDelimiter(fieldDelimiter);
conf = ((PrefixContext)context).getConfiguration();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
index 4e6209d..6e1c958 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -217,7 +217,7 @@ public class TestHdfsLoad extends TestCase {
public static class DummyExtractor extends Extractor {
@Override
- public void run(ImmutableContext context, Object oc, Object oj, Partition partition, DataWriter writer) {
+ public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) {
int id = ((DummyPartition)partition).getId();
for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
Object[] array = new Object[] {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 8590065..427132e 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -133,7 +133,7 @@ public class TestMapReduce extends TestCase {
public static class DummyExtractor extends Extractor {
@Override
- public void run(ImmutableContext context, Object oc, Object oj, Partition partition, DataWriter writer) {
+ public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) {
int id = ((DummyPartition)partition).getId();
for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
writer.writeArrayRecord(new Object[] {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
index fac6f05..300cf4e 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
@@ -24,7 +24,7 @@ import org.apache.sqoop.job.io.DataWriter;
* This allows connector to extract data from a source system
* based on each partition.
*/
-public abstract class Extractor<ConnectionConfiguration, JobConfiguration> {
+public abstract class Extractor<ConnectionConfiguration, JobConfiguration, Partition> {
public abstract void run(ImmutableContext context,
ConnectionConfiguration connectionConfiguration,
@@ -34,14 +34,14 @@ public abstract class Extractor<ConnectionConfiguration, JobConfiguration> {
/**
* Return the number of rows read by the last call to
- * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) }
+ * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) }
* method. This method returns only the number of rows read in the last call,
* and not a cumulative total of the number of rows read by this Extractor
* since its creation. If no calls were made to the run method, this method's
* behavior is undefined.
*
* @return the number of rows read by the last call to
- * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) }
+ * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) }
*/
public abstract long getRowsRead();