You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 05:07:15 UTC
[23/50] [abbrv] git commit: SQOOP-1488: Sqoop2: From/To: Run both
destroyers
SQOOP-1488: Sqoop2: From/To: Run both destroyers
(Abraham Elmahrek via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b04e796f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b04e796f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b04e796f
Branch: refs/heads/sqoop2
Commit: b04e796f01cb659efc55314029fa18cfd80bb16d
Parents: 27fb31d
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sun Sep 21 13:00:46 2014 -0700
Committer: Abraham Elmahrek <ab...@elmahrek.com>
Committed: Thu Oct 9 17:58:18 2014 -0700
----------------------------------------------------------------------
.../mapreduce/MapreduceExecutionEngine.java | 3 +-
.../java/org/apache/sqoop/job/JobConstants.java | 6 ++--
.../sqoop/job/mr/SqoopDestroyerExecutor.java | 33 ++++++++++++++------
.../sqoop/job/mr/SqoopFileOutputFormat.java | 7 +++--
.../sqoop/job/mr/SqoopNullOutputFormat.java | 7 +++--
.../org/apache/sqoop/job/TestMapReduce.java | 29 +++++++++++++++++
.../jdbc/generic/TableStagedRDBMSTest.java | 15 ++++-----
7 files changed, 75 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index ef7ff4e..049d183 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -68,7 +68,8 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName());
- context.setString(JobConstants.JOB_ETL_DESTROYER, from.getDestroyer().getName());
+ context.setString(JobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName());
+ context.setString(JobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName());
context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
mrJobRequest.getIntermediateDataFormat().getName());
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
index 4cdb002..542607a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -37,9 +37,11 @@ public final class JobConstants extends Constants {
public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
+ "etl.loader";
- public static final String JOB_ETL_DESTROYER = PREFIX_JOB_CONFIG
- + "etl.destroyer";
+ public static final String JOB_ETL_FROM_DESTROYER = PREFIX_JOB_CONFIG
+ + "etl.from.destroyer";
+ public static final String JOB_ETL_TO_DESTROYER = PREFIX_JOB_CONFIG
+ + "etl.to.destroyer";
public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG
+ "mr.output.file";
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index e3af6e1..aecde40 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -40,10 +40,24 @@ public class SqoopDestroyerExecutor {
* @param success True if the job execution was successfull
* @param configuration Configuration object to get destroyer class with context
* and configuration objects.
- * @param propertyName Name of property that holds destroyer class.
+ * @param direction The direction of the Destroyer to execute.
*/
- public static void executeDestroyer(boolean success, Configuration configuration, String propertyName) {
- Destroyer destroyer = (Destroyer) ClassUtils.instantiate(configuration.get(propertyName));
+ public static void executeDestroyer(boolean success, Configuration configuration, Direction direction) {
+ String destroyerPropertyName, prefixPropertyName;
+ switch (direction) {
+ default:
+ case FROM:
+ destroyerPropertyName = JobConstants.JOB_ETL_FROM_DESTROYER;
+ prefixPropertyName = JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT;
+ break;
+
+ case TO:
+ destroyerPropertyName = JobConstants.JOB_ETL_TO_DESTROYER;
+ prefixPropertyName = JobConstants.PREFIX_CONNECTOR_TO_CONTEXT;
+ break;
+ }
+
+ Destroyer destroyer = (Destroyer) ClassUtils.instantiate(configuration.get(destroyerPropertyName));
if(destroyer == null) {
LOG.info("Skipping running destroyer as non was defined.");
@@ -51,16 +65,17 @@ public class SqoopDestroyerExecutor {
}
// Objects that should be pass to the Destroyer execution
- PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
- Object fromConfigConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, configuration);
- Object fromConfigJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, configuration);
+ PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName);
+ Object configConnection = ConfigurationUtils.getConnectorConnectionConfig(direction, configuration);
+ Object configJob = ConfigurationUtils.getConnectorJobConfig(direction, configuration);
+
+ // Propagate connector schema in every case for now
+ Schema schema = ConfigurationUtils.getConnectorSchema(direction, configuration);
- // TODO(Abe/Gwen): Change to conditional choosing between schemas.
- Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, configuration);
DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);
LOG.info("Executing destroyer class " + destroyer.getClass());
- destroyer.destroy(destroyerContext, fromConfigConnection, fromConfigJob);
+ destroyer.destroy(destroyerContext, configConnection, configJob);
}
private SqoopDestroyerExecutor() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
index 3e2b1c5..ca77e16 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.io.SqoopWritable;
@@ -84,7 +85,8 @@ public class SqoopFileOutputFormat
super.commitJob(context);
Configuration config = context.getConfiguration();
- SqoopDestroyerExecutor.executeDestroyer(true, config, JobConstants.JOB_ETL_DESTROYER);
+ SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.FROM);
+ SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.TO);
}
@Override
@@ -92,7 +94,8 @@ public class SqoopFileOutputFormat
super.abortJob(context, state);
Configuration config = context.getConfiguration();
- SqoopDestroyerExecutor.executeDestroyer(false, config, JobConstants.JOB_ETL_DESTROYER);
+ SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.FROM);
+ SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.TO);
}
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
index b3461bb..594b5e9 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.io.SqoopWritable;
@@ -67,7 +68,8 @@ public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWrita
super.commitJob(jobContext);
Configuration config = jobContext.getConfiguration();
- SqoopDestroyerExecutor.executeDestroyer(true, config, JobConstants.JOB_ETL_DESTROYER);
+ SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.FROM);
+ SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.TO);
}
@Override
@@ -75,7 +77,8 @@ public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWrita
super.abortJob(jobContext, state);
Configuration config = jobContext.getConfiguration();
- SqoopDestroyerExecutor.executeDestroyer(false, config, JobConstants.JOB_ETL_DESTROYER);
+ SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.FROM);
+ SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.TO);
}
@Override
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 2dfc487..869c727 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,8 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.Loader;
@@ -100,6 +103,8 @@ public class TestMapReduce extends TestCase {
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+ conf.set(JobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
+ conf.set(JobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Schema schema = new Schema("Test");
@@ -110,6 +115,10 @@ public class TestMapReduce extends TestCase {
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
SqoopNullOutputFormat.class);
+
+ // Make sure both destroyers get called.
+ Assert.assertEquals(1, DummyFromDestroyer.count);
+ Assert.assertEquals(1, DummyToDestroyer.count);
}
public static class DummyPartition extends Partition {
@@ -251,4 +260,24 @@ public class TestMapReduce extends TestCase {
}
}
}
+
+ public static class DummyFromDestroyer extends Destroyer {
+
+ public static int count = 0;
+
+ @Override
+ public void destroy(DestroyerContext context, Object o, Object o2) {
+ count++;
+ }
+ }
+
+ public static class DummyToDestroyer extends Destroyer {
+
+ public static int count = 0;
+
+ @Override
+ public void destroy(DestroyerContext context, Object o, Object o2) {
+ count++;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b04e796f/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
index cb782c7..1af0cdc 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
@@ -68,15 +68,12 @@ public class TableStagedRDBMSTest extends ConnectorTestCase {
runJob(job);
- // @TODO(Abe): Change back after SQOOP-1488
-// assertEquals(0L, provider.rowCount(stageTableName));
-// assertEquals(4L, rowCount());
-// assertRowInCities(1, "USA", "San Francisco");
-// assertRowInCities(2, "USA", "Sunnyvale");
-// assertRowInCities(3, "Czech Republic", "Brno");
-// assertRowInCities(4, "USA", "Palo Alto");
- assertEquals(4L, provider.rowCount(stageTableName));
- assertEquals(0L, rowCount());
+ assertEquals(0L, provider.rowCount(stageTableName));
+ assertEquals(4L, rowCount());
+ assertRowInCities(1, "USA", "San Francisco");
+ assertRowInCities(2, "USA", "Sunnyvale");
+ assertRowInCities(3, "Czech Republic", "Brno");
+ assertRowInCities(4, "USA", "Palo Alto");
// Clean up testing table
provider.dropTable(stageTableName);