You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ch...@apache.org on 2013/01/31 00:24:41 UTC

[1/2] git commit: SQOOP-842: Put partition to template in Extractor as well

SQOOP-842: Put partition to template in Extractor as well

(Jarcec Cecho via Cheolsoo Park)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/92062d53
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/92062d53
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/92062d53

Branch: refs/heads/sqoop2
Commit: 92062d5343e6318fe703b06e30c52920938818b0
Parents: 03408d5
Author: Cheolsoo Park <ch...@apache.org>
Authored: Wed Jan 30 15:13:18 2013 -0800
Committer: Cheolsoo Park <ch...@apache.org>
Committed: Wed Jan 30 15:13:18 2013 -0800

----------------------------------------------------------------------
 .../connector/jdbc/GenericJdbcImportExtractor.java |    6 +++---
 .../sqoop/job/etl/HdfsSequenceExportExtractor.java |   15 ++++++++-------
 .../sqoop/job/etl/HdfsTextExportExtractor.java     |    8 +++++---
 .../java/org/apache/sqoop/job/TestHdfsLoad.java    |    2 +-
 .../java/org/apache/sqoop/job/TestMapReduce.java   |    2 +-
 .../java/org/apache/sqoop/job/etl/Extractor.java   |    6 +++---
 6 files changed, 21 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
index f4389a3..9db3328 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
@@ -30,13 +30,13 @@ import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.io.DataWriter;
 
-public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration> {
+public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration, GenericJdbcImportPartition> {
 
  public static final Logger LOG = Logger.getLogger(GenericJdbcImportExtractor.class);
 
  private long rowsRead = 0;
   @Override
-  public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, Partition partition, DataWriter writer) {
+  public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, GenericJdbcImportPartition partition, DataWriter writer) {
     String driver = connection.connection.jdbcDriver;
     String url = connection.connection.connectionString;
     String username = connection.connection.username;
@@ -44,7 +44,7 @@ public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguratio
     GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
 
     String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL);
-    String conditions = ((GenericJdbcImportPartition)partition).getConditions();
+    String conditions = partition.getConditions();
     query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
     LOG.info("Using query: " + query);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
index 3a04e59..45b6166 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
@@ -27,12 +27,14 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
+import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.io.DataWriter;
 
-public class HdfsSequenceExportExtractor extends Extractor {
+public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
 
   public static final Log LOG =
     LogFactory.getLog(HdfsSequenceExportExtractor.class.getName());
@@ -47,19 +49,18 @@ public class HdfsSequenceExportExtractor extends Extractor {
   }
 
   @Override
-  public void run(ImmutableContext context, Object connectionConfiguration,
-      Object jobConfiguration, Partition partition, DataWriter writer) {
+  public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration,
+      ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) {
     writer.setFieldDelimiter(fieldDelimiter);
 
     conf = ((PrefixContext)context).getConfiguration();
     datawriter = writer;
 
     try {
-      HdfsExportPartition p = (HdfsExportPartition)partition;
-      LOG.info("Working on partition: " + p);
-      int numFiles = p.getNumberOfFiles();
+      LOG.info("Working on partition: " + partition);
+      int numFiles = partition.getNumberOfFiles();
       for (int i=0; i<numFiles; i++) {
-        extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
+        extractFile(partition.getFile(i), partition.getOffset(i), partition.getLength(i));
       }
     } catch (IOException e) {
       throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
index e00d428..ed30c91 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
@@ -33,12 +33,14 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.util.LineReader;
 import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
+import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.io.DataWriter;
 
-public class HdfsTextExportExtractor extends Extractor {
+public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition>  {
 
   public static final Log LOG =
     LogFactory.getLog(HdfsTextExportExtractor.class.getName());
@@ -53,8 +55,8 @@ public class HdfsTextExportExtractor extends Extractor {
   }
 
   @Override
-  public void run(ImmutableContext context, Object connectionConfiguration,
-      Object jobConfiguration, Partition partition, DataWriter writer) {
+  public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration,
+      ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) {
     writer.setFieldDelimiter(fieldDelimiter);
 
     conf = ((PrefixContext)context).getConfiguration();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
index 4e6209d..6e1c958 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -217,7 +217,7 @@ public class TestHdfsLoad extends TestCase {
 
   public static class DummyExtractor extends Extractor {
     @Override
-    public void run(ImmutableContext context, Object oc, Object oj, Partition partition, DataWriter writer) {
+    public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) {
       int id = ((DummyPartition)partition).getId();
       for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
         Object[] array = new Object[] {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 8590065..427132e 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -133,7 +133,7 @@ public class TestMapReduce extends TestCase {
 
   public static class DummyExtractor extends Extractor {
     @Override
-    public void run(ImmutableContext context, Object oc, Object oj, Partition partition, DataWriter writer) {
+    public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) {
       int id = ((DummyPartition)partition).getId();
       for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
         writer.writeArrayRecord(new Object[] {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
index fac6f05..300cf4e 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
@@ -24,7 +24,7 @@ import org.apache.sqoop.job.io.DataWriter;
  * This allows connector to extract data from a source system
  * based on each partition.
  */
-public abstract class Extractor<ConnectionConfiguration, JobConfiguration> {
+public abstract class Extractor<ConnectionConfiguration, JobConfiguration, Partition> {
 
   public abstract void run(ImmutableContext context,
                            ConnectionConfiguration connectionConfiguration,
@@ -34,14 +34,14 @@ public abstract class Extractor<ConnectionConfiguration, JobConfiguration> {
 
   /**
    * Return the number of rows read by the last call to
-   * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) }
+   * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) }
    * method. This method returns only the number of rows read in the last call,
    * and not a cumulative total of the number of rows read by this Extractor
    * since its creation. If no calls were made to the run method, this method's
    * behavior is undefined.
    *
    * @return the number of rows read by the last call to
-   * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) }
+   * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) }
    */
   public abstract long getRowsRead();