You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 05:07:15 UTC

[23/50] [abbrv] git commit: SQOOP-1488: Sqoop2: From/To: Run both destroyers

SQOOP-1488: Sqoop2: From/To: Run both destroyers

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b04e796f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b04e796f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b04e796f

Branch: refs/heads/sqoop2
Commit: b04e796f01cb659efc55314029fa18cfd80bb16d
Parents: 27fb31d
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sun Sep 21 13:00:46 2014 -0700
Committer: Abraham Elmahrek <ab...@elmahrek.com>
Committed: Thu Oct 9 17:58:18 2014 -0700

----------------------------------------------------------------------
 .../mapreduce/MapreduceExecutionEngine.java     |  3 +-
 .../java/org/apache/sqoop/job/JobConstants.java |  6 ++--
 .../sqoop/job/mr/SqoopDestroyerExecutor.java    | 33 ++++++++++++++------
 .../sqoop/job/mr/SqoopFileOutputFormat.java     |  7 +++--
 .../sqoop/job/mr/SqoopNullOutputFormat.java     |  7 +++--
 .../org/apache/sqoop/job/TestMapReduce.java     | 29 +++++++++++++++++
 .../jdbc/generic/TableStagedRDBMSTest.java      | 15 ++++-----
 7 files changed, 75 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index ef7ff4e..049d183 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -68,7 +68,8 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
     context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
     context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName());
-    context.setString(JobConstants.JOB_ETL_DESTROYER, from.getDestroyer().getName());
+    context.setString(JobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName());
+    context.setString(JobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName());
     context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
         mrJobRequest.getIntermediateDataFormat().getName());
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
index 4cdb002..542607a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -37,9 +37,11 @@ public final class JobConstants extends Constants {
   public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
       + "etl.loader";
 
-  public static final String JOB_ETL_DESTROYER = PREFIX_JOB_CONFIG
-      + "etl.destroyer";
+  public static final String JOB_ETL_FROM_DESTROYER = PREFIX_JOB_CONFIG
+      + "etl.from.destroyer";
 
+  public static final String JOB_ETL_TO_DESTROYER = PREFIX_JOB_CONFIG
+      + "etl.to.destroyer";
 
   public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG
       + "mr.output.file";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index e3af6e1..aecde40 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -40,10 +40,24 @@ public class SqoopDestroyerExecutor {
    * @param success True if the job execution was successfull
    * @param configuration Configuration object to get destroyer class with context
    *                      and configuration objects.
-   * @param propertyName Name of property that holds destroyer class.
+   * @param direction The direction of the Destroyer to execute.
    */
-  public static void executeDestroyer(boolean success, Configuration configuration, String propertyName) {
-    Destroyer destroyer = (Destroyer) ClassUtils.instantiate(configuration.get(propertyName));
+  public static void executeDestroyer(boolean success, Configuration configuration, Direction direction) {
+    String destroyerPropertyName, prefixPropertyName;
+    switch (direction) {
+      default:
+      case FROM:
+        destroyerPropertyName = JobConstants.JOB_ETL_FROM_DESTROYER;
+        prefixPropertyName = JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT;
+        break;
+
+      case TO:
+        destroyerPropertyName = JobConstants.JOB_ETL_TO_DESTROYER;
+        prefixPropertyName = JobConstants.PREFIX_CONNECTOR_TO_CONTEXT;
+        break;
+    }
+
+    Destroyer destroyer = (Destroyer) ClassUtils.instantiate(configuration.get(destroyerPropertyName));
 
     if(destroyer == null) {
       LOG.info("Skipping running destroyer as non was defined.");
@@ -51,16 +65,17 @@ public class SqoopDestroyerExecutor {
     }
 
     // Objects that should be pass to the Destroyer execution
-    PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
-    Object fromConfigConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, configuration);
-    Object fromConfigJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, configuration);
+    PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName);
+    Object configConnection = ConfigurationUtils.getConnectorConnectionConfig(direction, configuration);
+    Object configJob = ConfigurationUtils.getConnectorJobConfig(direction, configuration);
+
+    // Propagate connector schema in every case for now
+    Schema schema = ConfigurationUtils.getConnectorSchema(direction, configuration);
 
-    // TODO(Abe/Gwen): Change to conditional choosing between schemas.
-    Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, configuration);
     DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);
 
     LOG.info("Executing destroyer class " + destroyer.getClass());
-    destroyer.destroy(destroyerContext, fromConfigConnection, fromConfigJob);
+    destroyer.destroy(destroyerContext, configConnection, configJob);
   }
 
   private SqoopDestroyerExecutor() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
index 3e2b1c5..ca77e16 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.io.SqoopWritable;
 
@@ -84,7 +85,8 @@ public class SqoopFileOutputFormat
       super.commitJob(context);
 
       Configuration config = context.getConfiguration();
-      SqoopDestroyerExecutor.executeDestroyer(true, config, JobConstants.JOB_ETL_DESTROYER);
+      SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.FROM);
+      SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.TO);
     }
 
     @Override
@@ -92,7 +94,8 @@ public class SqoopFileOutputFormat
       super.abortJob(context, state);
 
       Configuration config = context.getConfiguration();
-      SqoopDestroyerExecutor.executeDestroyer(false, config, JobConstants.JOB_ETL_DESTROYER);
+      SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.FROM);
+      SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.TO);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
index b3461bb..594b5e9 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.io.SqoopWritable;
 
@@ -67,7 +68,8 @@ public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWrita
       super.commitJob(jobContext);
 
       Configuration config = jobContext.getConfiguration();
-      SqoopDestroyerExecutor.executeDestroyer(true, config, JobConstants.JOB_ETL_DESTROYER);
+      SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.FROM);
+      SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.TO);
     }
 
     @Override
@@ -75,7 +77,8 @@ public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWrita
       super.abortJob(jobContext, state);
 
       Configuration config = jobContext.getConfiguration();
-      SqoopDestroyerExecutor.executeDestroyer(false, config, JobConstants.JOB_ETL_DESTROYER);
+      SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.FROM);
+      SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.TO);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 2dfc487..869c727 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,8 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.job.etl.Loader;
@@ -100,6 +103,8 @@ public class TestMapReduce extends TestCase {
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+    conf.set(JobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
+    conf.set(JobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
     conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
       CSVIntermediateDataFormat.class.getName());
     Schema schema = new Schema("Test");
@@ -110,6 +115,10 @@ public class TestMapReduce extends TestCase {
     ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
     JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
         SqoopNullOutputFormat.class);
+
+    // Make sure both destroyers get called.
+    Assert.assertEquals(1, DummyFromDestroyer.count);
+    Assert.assertEquals(1, DummyToDestroyer.count);
   }
 
   public static class DummyPartition extends Partition {
@@ -251,4 +260,24 @@ public class TestMapReduce extends TestCase {
       }
     }
   }
+
+  public static class DummyFromDestroyer extends Destroyer {
+
+    public static int count = 0;
+
+    @Override
+    public void destroy(DestroyerContext context, Object o, Object o2) {
+      count++;
+    }
+  }
+
+  public static class DummyToDestroyer extends Destroyer {
+
+    public static int count = 0;
+
+    @Override
+    public void destroy(DestroyerContext context, Object o, Object o2) {
+      count++;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
index cb782c7..1af0cdc 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
@@ -68,15 +68,12 @@ public class TableStagedRDBMSTest extends ConnectorTestCase {
 
     runJob(job);
 
-    // @TODO(Abe): Change back after SQOOP-1488
-//    assertEquals(0L, provider.rowCount(stageTableName));
-//    assertEquals(4L, rowCount());
-//    assertRowInCities(1, "USA", "San Francisco");
-//    assertRowInCities(2, "USA", "Sunnyvale");
-//    assertRowInCities(3, "Czech Republic", "Brno");
-//    assertRowInCities(4, "USA", "Palo Alto");
-    assertEquals(4L, provider.rowCount(stageTableName));
-    assertEquals(0L, rowCount());
+    assertEquals(0L, provider.rowCount(stageTableName));
+    assertEquals(4L, rowCount());
+    assertRowInCities(1, "USA", "San Francisco");
+    assertRowInCities(2, "USA", "Sunnyvale");
+    assertRowInCities(3, "Czech Republic", "Brno");
+    assertRowInCities(4, "USA", "Palo Alto");
 
     // Clean up testing table
     provider.dropTable(stageTableName);