You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/12/14 21:40:13 UTC
sqoop git commit: SQOOP-1882: JobManager currently ignores the TO
connector IDF and assumed all IDF use String for the generic T
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 9df8a53dd -> 452791676
SQOOP-1882: JobManager currently ignores the TO connector IDF and assumed all IDF use String for the generic T
(Veena Basavaraj via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/45279167
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/45279167
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/45279167
Branch: refs/heads/sqoop2
Commit: 4527916766bfadf10654d0c694db470ef4641724
Parents: 9df8a53
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sun Dec 14 12:39:41 2014 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Sun Dec 14 12:39:41 2014 -0800
----------------------------------------------------------------------
.../idf/CSVIntermediateDataFormat.java | 26 -------------
.../connector/idf/IntermediateDataFormat.java | 6 +--
.../idf/TestCSVIntermediateDataFormat.java | 2 +-
.../org/apache/sqoop/driver/JobManager.java | 35 +++++++++--------
.../org/apache/sqoop/driver/JobRequest.java | 25 ++++++++----
.../mapreduce/MapreduceExecutionEngine.java | 7 +++-
.../org/apache/sqoop/job/MRJobConstants.java | 7 +++-
.../org/apache/sqoop/job/io/SqoopWritable.java | 23 ++++++-----
.../org/apache/sqoop/job/mr/SqoopMapper.java | 40 ++++++++++----------
.../job/mr/SqoopOutputFormatLoadExecutor.java | 25 ++++++------
.../org/apache/sqoop/job/TestMapReduce.java | 11 ++++--
.../java/org/apache/sqoop/job/TestMatching.java | 4 +-
.../mr/TestSqoopOutputFormatLoadExecutor.java | 10 ++---
13 files changed, 113 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index dbe193d..275321a 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -23,24 +23,16 @@ import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.AbstractComplexListType;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.ColumnType;
-import org.apache.sqoop.schema.type.FixedPoint;
-import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.utils.ClassUtils;
import org.joda.time.DateTime;
import org.joda.time.LocalDate;
import org.joda.time.LocalDateTime;
import org.joda.time.LocalTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
-import org.json.simple.parser.JSONParser;
import java.io.DataInput;
import java.io.DataOutput;
@@ -319,20 +311,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
return data.equals(((CSVIntermediateDataFormat) other).data);
}
- public int compareTo(IntermediateDataFormat<?> o) {
- if (this == o) {
- return 0;
- }
- if (this.equals(o)) {
- return 0;
- }
- if (!(o instanceof CSVIntermediateDataFormat)) {
- throw new IllegalStateException("Expected Data to be instance of "
- + "CSVIntermediateFormat, but was an instance of " + o.getClass().getName());
- }
- return data.compareTo(o.getCSVTextData());
- }
-
/**
* Encode to the sqoop prescribed CSV String for every element in the objet array
* @param objectArray
@@ -392,10 +370,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
}
}
- public String toString() {
- return data;
- }
-
/**
* {@inheritDoc}
*/
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index 93698a8..b8c8042 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -63,10 +63,10 @@ public abstract class IntermediateDataFormat<T> {
* Set one row of data. If validate is set to true, the data is validated
* against the schema.
*
- * @param data - A single row of data to be moved.
+ * @param obj - A single row of data to be moved.
*/
- public void setData(T data) {
- this.data = data;
+ public void setData(T obj) {
+ this.data = obj;
}
/**
* Get one row of data as CSV text. Use {@link #SqoopIDFUtils} for reading and writing
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index 83a95ec..f6852a0 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -51,7 +51,7 @@ import org.junit.Test;
public class TestCSVIntermediateDataFormat {
- private IntermediateDataFormat<?> dataFormat;
+ private CSVIntermediateDataFormat dataFormat;
@Before
public void setUp() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index 01073d4..ff263ae 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -332,7 +332,6 @@ public class JobManager implements Reconfigurable {
.instantiate(Driver.getInstance().getDriverJobConfigurationClass());
ConfigUtils.fromConfigs(job.getDriverConfig().getConfigs(), driverConfig);
-
// Create a job request for submit/execution
JobRequest jobRequest = executionEngine.createJobRequest();
// Save important variables to the job request
@@ -350,19 +349,23 @@ public class JobManager implements Reconfigurable {
jobRequest.setJobName(job.getName());
jobRequest.setJobId(job.getPersistenceId());
jobRequest.setNotificationUrl(notificationBaseUrl + jobId);
- Class<? extends IntermediateDataFormat<?>> dataFormatClass = fromConnector
- .getIntermediateDataFormat();
- jobRequest.setIntermediateDataFormat(dataFormatClass);
+ jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat(), Direction.FROM);
+ jobRequest.setIntermediateDataFormat(toConnector.getIntermediateDataFormat(), Direction.TO);
jobRequest.setFrom(fromConnector.getFrom());
jobRequest.setTo(toConnector.getTo());
// set all the jars
addStandardJars(jobRequest);
- addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass);
+ addConnectorClass(jobRequest, fromConnector);
+ addConnectorClass(jobRequest, toConnector);
+ addConnectorIDFClass(jobRequest, fromConnector.getIntermediateDataFormat());
+ addConnectorIDFClass(jobRequest, toConnector.getIntermediateDataFormat());
+
addConnectorInitializerJars(jobRequest, Direction.FROM);
addConnectorInitializerJars(jobRequest, Direction.TO);
- addIDFJars(jobRequest);
+ addIDFDependentJars(jobRequest, Direction.FROM);
+ addIDFDependentJars(jobRequest, Direction.TO);
// call the intialize method
initializeConnector(jobRequest, Direction.FROM);
@@ -375,11 +378,12 @@ public class JobManager implements Reconfigurable {
return jobRequest;
}
- private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector,
- SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) {
- jobRequest.addJarForClass(fromConnector.getClass());
- jobRequest.addJarForClass(toConnector.getClass());
- jobRequest.addJarForClass(dataFormatClass);
+ private void addConnectorClass(final JobRequest jobRequest, final SqoopConnector connector) {
+ jobRequest.addJarForClass(connector.getClass());
+ }
+
+ private void addConnectorIDFClass(final JobRequest jobRequest, Class<? extends IntermediateDataFormat<?>> idfClass) {
+ jobRequest.addJarForClass(idfClass);
}
private void addStandardJars(JobRequest jobRequest) {
@@ -455,11 +459,10 @@ public class JobManager implements Reconfigurable {
jobRequest.getJobConfig(direction));
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
- // TODO:SQOOP-1882 , should add the FROM and TO connector IDF jars
- private void addIDFJars(JobRequest jobRequest) {
- Class<? extends IntermediateDataFormat> idfClass = jobRequest.getIntermediateDataFormat();
- IntermediateDataFormat idf = (IntermediateDataFormat) ClassUtils.instantiate(idfClass);
+ @SuppressWarnings("unchecked")
+ private void addIDFDependentJars(JobRequest jobRequest, Direction direction) {
+ Class<? extends IntermediateDataFormat<?>> idfClass = jobRequest.getIntermediateDataFormat(direction);
+ IntermediateDataFormat<?> idf = ((IntermediateDataFormat<?>) ClassUtils.instantiate(idfClass));
jobRequest.addJars(idf.getJars());
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
index 8c1cc95..c9377a7 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
@@ -27,8 +27,10 @@ import org.apache.sqoop.job.etl.Transferable;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.utils.ClassUtils;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
/**
* Submission details class is used when creating new submission and contains
@@ -111,9 +113,14 @@ public class JobRequest {
Integer loaders;
/**
- * The intermediate data format this submission should use.
+ * The intermediate data format this submission should use to read/extract.
*/
- Class<? extends IntermediateDataFormat> intermediateDataFormat;
+ Class<? extends IntermediateDataFormat<?>> fromIDF;
+
+ /**
+ * The intermediate data format this submission should use to write/load.
+ */
+ Class<? extends IntermediateDataFormat<?>> toIDF;
public JobRequest() {
this.jars = new LinkedList<String>();
@@ -191,7 +198,7 @@ public class JobRequest {
}
}
- public void addJarForClass(Class klass) {
+ public void addJarForClass(Class<?> klass) {
addJar(ClassUtils.jarForClass(klass));
}
@@ -318,12 +325,16 @@ public class JobRequest {
this.loaders = loaders;
}
- public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
- return intermediateDataFormat;
+ public Class<? extends IntermediateDataFormat<?>> getIntermediateDataFormat(Direction direction) {
+ return direction.equals(Direction.FROM) ? fromIDF : toIDF;
}
- public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
- this.intermediateDataFormat = intermediateDataFormat;
+ public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat<? extends Object>> intermediateDataFormat, Direction direction) {
+ if (direction.equals(Direction.FROM)) {
+ fromIDF = intermediateDataFormat;
+ } else if (direction.equals(Direction.TO)) {
+ toIDF = intermediateDataFormat;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 9b3eb44..3f79325 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.execution.mapreduce;
import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.driver.ExecutionEngine;
import org.apache.sqoop.driver.JobRequest;
@@ -69,8 +70,10 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
context.setString(MRJobConstants.JOB_ETL_LOADER, to.getLoader().getName());
context.setString(MRJobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName());
context.setString(MRJobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName());
- context.setString(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
- mrJobRequest.getIntermediateDataFormat().getName());
+ context.setString(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT,
+ mrJobRequest.getIntermediateDataFormat(Direction.FROM).getName());
+ context.setString(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT,
+ mrJobRequest.getIntermediateDataFormat(Direction.TO).getName());
if(mrJobRequest.getExtractors() != null) {
context.setInteger(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
index 67021a8..b7aa8c6 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
@@ -72,8 +72,11 @@ public final class MRJobConstants extends Constants {
public static final String HADOOP_COMPRESS_CODEC =
"mapred.output.compression.codec";
- public static final String INTERMEDIATE_DATA_FORMAT =
- DriverConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format";
+ public static final String FROM_INTERMEDIATE_DATA_FORMAT =
+ DriverConstants.PREFIX_EXECUTION_CONFIG + "from.intermediate.format";
+
+ public static final String TO_INTERMEDIATE_DATA_FORMAT =
+ DriverConstants.PREFIX_EXECUTION_CONFIG + "to.intermediate.format";
private MRJobConstants() {
// Disable explicit object creation
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
index 336ab97..ed182cb 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
@@ -21,7 +21,6 @@ package org.apache.sqoop.job.io;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
-
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.utils.ClassUtils;
@@ -30,8 +29,12 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+/**
+ * Writable used to load the data to the {@link #Transferable} entity TO
+ */
+
public class SqoopWritable implements Configurable, WritableComparable<SqoopWritable> {
- private IntermediateDataFormat<?> dataFormat;
+ private IntermediateDataFormat<?> toIDF;
private Configuration conf;
public SqoopWritable() {
@@ -39,22 +42,22 @@ public class SqoopWritable implements Configurable, WritableComparable<SqoopWrit
}
public SqoopWritable(IntermediateDataFormat<?> dataFormat) {
- this.dataFormat = dataFormat;
+ this.toIDF = dataFormat;
}
public void setString(String data) {
- this.dataFormat.setCSVTextData(data);
+ this.toIDF.setCSVTextData(data);
}
- public String getString() { return dataFormat.getCSVTextData(); }
+ public String getString() { return toIDF.getCSVTextData(); }
@Override
public void write(DataOutput out) throws IOException {
- out.writeUTF(dataFormat.getCSVTextData());
+ out.writeUTF(toIDF.getCSVTextData());
}
@Override
- public void readFields(DataInput in) throws IOException { dataFormat.setCSVTextData(in.readUTF()); }
+ public void readFields(DataInput in) throws IOException { toIDF.setCSVTextData(in.readUTF()); }
@Override
public int compareTo(SqoopWritable o) { return getString().compareTo(o.getString()); }
@@ -68,9 +71,9 @@ public class SqoopWritable implements Configurable, WritableComparable<SqoopWrit
public void setConf(Configuration conf) {
this.conf = conf;
- if (dataFormat == null) {
- String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT);
- this.dataFormat = (IntermediateDataFormat<?>) ClassUtils.instantiate(intermediateDataFormatName);
+ if (toIDF == null) {
+ String toIDFClass = conf.get(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT);
+ this.toIDF = (IntermediateDataFormat<?>) ClassUtils.instantiate(toIDFClass);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 7434243..dee0011 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -56,8 +56,8 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
* Service for reporting progress to mapreduce.
*/
private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor();
- private IntermediateDataFormat<String> fromDataFormat = null;
- private IntermediateDataFormat<String> toDataFormat = null;
+ private IntermediateDataFormat<Object> fromIDF = null;
+ private IntermediateDataFormat<Object> toIDF = null;
private Matcher matcher;
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -72,11 +72,12 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
Schema toSchema = MRConfigurationUtils.getConnectorSchema(Direction.TO, conf);
matcher = MatcherFactory.getMatcher(fromSchema, toSchema);
- String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT);
- fromDataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(intermediateDataFormatName);
- fromDataFormat.setSchema(matcher.getFromSchema());
- toDataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(intermediateDataFormatName);
- toDataFormat.setSchema(matcher.getToSchema());
+ String fromIDFClass = conf.get(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT);
+ fromIDF = (IntermediateDataFormat<Object>) ClassUtils.instantiate(fromIDFClass);
+ fromIDF.setSchema(matcher.getFromSchema());
+ String toIDFClass = conf.get(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT);
+ toIDF = (IntermediateDataFormat<Object>) ClassUtils.instantiate(toIDFClass);
+ toIDF.setSchema(matcher.getToSchema());
// Objects that should be passed to the Executor execution
PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
@@ -107,45 +108,46 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
}
// There are two IDF objects we carry around in memory during the sqoop job execution.
- // The fromDataFormat has the fromSchema in it, the toDataFormat has the toSchema in it.
- // Before we do the writing to the toDatFormat object we do the matching process to negotiate between
- // the two schemas and their corresponding column types before we write the data to the toDataFormat object
+ // The fromIDF has the fromSchema in it, the toIDF has the toSchema in it.
+ // Before we do the writing to the toIDF object we do the matching process to negotiate between
+ // the two schemas and their corresponding column types before we write the data to the toIDF object
private class SqoopMapDataWriter extends DataWriter {
private Context context;
private SqoopWritable writable;
public SqoopMapDataWriter(Context context) {
this.context = context;
- this.writable = new SqoopWritable(toDataFormat);
+ this.writable = new SqoopWritable(toIDF);
}
@Override
public void writeArrayRecord(Object[] array) {
- fromDataFormat.setObjectData(array);
+ fromIDF.setObjectData(array);
writeContent();
}
@Override
public void writeStringRecord(String text) {
- fromDataFormat.setCSVTextData(text);
+ fromIDF.setCSVTextData(text);
writeContent();
}
@Override
public void writeRecord(Object obj) {
- fromDataFormat.setData(obj.toString());
+ fromIDF.setData(obj);
writeContent();
}
private void writeContent() {
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("Extracted data: " + fromDataFormat.getCSVTextData());
+ LOG.debug("Extracted data: " + fromIDF.getCSVTextData());
}
- // NOTE: The fromDataFormat and the corresponding fromSchema is used only for the matching process
- // The output of the mappers is finally written to the toDataFormat object after the matching process
- // since the writable encapsulates the toDataFormat ==> new SqoopWritable(toDataFormat)
- toDataFormat.setObjectData(matcher.getMatchingData(fromDataFormat.getObjectData()));
+ // NOTE: The fromIDF and the corresponding fromSchema is used only for the matching process
+ // The output of the mappers is finally written to the toIDF object after the matching process
+ // since the writable encapsulates the toIDF ==> new SqoopWritable(toIDF)
+ toIDF.setObjectData(matcher.getMatchingData(fromIDF.getObjectData()));
+ // NOTE: We do not use the reducer to do the writing (a.k.a LOAD in ETL). Hence the mapper sets up the writable
context.write(writable, NullWritable.get());
} catch (Exception e) {
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index d664337..aaf771c 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.concurrent.*;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
@@ -32,7 +33,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.matcher.Matcher;
import org.apache.sqoop.connector.matcher.MatcherFactory;
@@ -53,7 +53,7 @@ public class SqoopOutputFormatLoadExecutor {
private volatile boolean readerFinished = false;
private volatile boolean writerFinished = false;
- private volatile IntermediateDataFormat<String> dataFormat;
+ private volatile IntermediateDataFormat<? extends Object> toDataFormat;
private Matcher matcher;
private JobContext context;
private SqoopRecordWriter writer;
@@ -63,11 +63,11 @@ public class SqoopOutputFormatLoadExecutor {
private volatile boolean isTest = false;
private String loaderName;
- // NOTE: This method is only exposed for test cases and hence assumes CSVIntermediateDataFormat
- SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){
+ // NOTE: This method is only exposed for test cases
+ SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName, IntermediateDataFormat<?> idf) {
this.isTest = isTest;
this.loaderName = loaderName;
- dataFormat = new CSVIntermediateDataFormat();
+ toDataFormat = idf;
writer = new SqoopRecordWriter();
matcher = null;
}
@@ -78,10 +78,10 @@ public class SqoopOutputFormatLoadExecutor {
matcher = MatcherFactory.getMatcher(
MRConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()),
MRConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()));
- dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
- .getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT));
+ toDataFormat = (IntermediateDataFormat<?>) ClassUtils.instantiate(context
+ .getConfiguration().get(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT));
// Using the TO schema since the SqoopDataWriter in the SqoopMapper encapsulates the toDataFormat
- dataFormat.setSchema(matcher.getToSchema());
+ toDataFormat.setSchema(matcher.getToSchema());
}
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
@@ -102,7 +102,7 @@ public class SqoopOutputFormatLoadExecutor {
free.acquire();
checkIfConsumerThrew();
// NOTE: this is the place where data written from SqoopMapper writable is available to the SqoopOutputFormat
- dataFormat.setCSVTextData(key.getString());
+ toDataFormat.setCSVTextData(key.getString());
filled.release();
}
@@ -158,7 +158,7 @@ public class SqoopOutputFormatLoadExecutor {
return null;
}
try {
- return dataFormat.getObjectData();
+ return toDataFormat.getObjectData();
} finally {
releaseSema();
}
@@ -172,7 +172,7 @@ public class SqoopOutputFormatLoadExecutor {
return null;
}
try {
- return dataFormat.getCSVTextData();
+ return toDataFormat.getCSVTextData();
} finally {
releaseSema();
}
@@ -185,7 +185,7 @@ public class SqoopOutputFormatLoadExecutor {
return null;
}
try {
- return dataFormat.getData();
+ return toDataFormat.getData();
} catch (Throwable t) {
readerFinished = true;
LOG.error("Caught exception e while getting content ", t);
@@ -215,6 +215,7 @@ public class SqoopOutputFormatLoadExecutor {
private class ConsumerThread implements Runnable {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void run() {
LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 256c34d..47696cc 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -67,7 +67,9 @@ public class TestMapReduce {
public void testSqoopInputFormat() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+ conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+ conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+
Job job = new Job(conf);
SqoopInputFormat inputformat = new SqoopInputFormat();
@@ -86,7 +88,9 @@ public class TestMapReduce {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+ conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+ conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+
Job job = new Job(conf);
// from and to have the same schema in this test case
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema());
@@ -106,7 +110,8 @@ public class TestMapReduce {
conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+ conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+ conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
Job job = new Job(conf);
// from and to have the same schema in this test case
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
index 67c8525..1692ddb 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
@@ -128,8 +128,8 @@ public class TestMatching {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
- CSVIntermediateDataFormat.class.getName());
+ conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+ conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
Job job = new Job(conf);
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index d897125..ec0e886 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -121,7 +121,7 @@ public class TestSqoopOutputFormatLoadExecutor {
@Before
public void setUp() {
conf = new Configuration();
- conf.setIfUnset(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+ conf.setIfUnset(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
}
@@ -129,7 +129,7 @@ public class TestSqoopOutputFormatLoadExecutor {
public void testWhenLoaderThrows() throws Throwable {
conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
- SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
+ SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName(), new CSVIntermediateDataFormat());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(dataFormat);
@@ -147,7 +147,7 @@ public class TestSqoopOutputFormatLoadExecutor {
public void testSuccessfulContinuousLoader() throws Throwable {
conf.set(MRJobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
- SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
+ SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName(), new CSVIntermediateDataFormat());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(dataFormat);
@@ -168,7 +168,7 @@ public class TestSqoopOutputFormatLoadExecutor {
@Test (expected = SqoopException.class)
public void testSuccessfulLoader() throws Throwable {
SqoopOutputFormatLoadExecutor executor = new
- SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
+ SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName(), new CSVIntermediateDataFormat());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(dataFormat);
@@ -192,7 +192,7 @@ public class TestSqoopOutputFormatLoadExecutor {
public void testThrowingContinuousLoader() throws Throwable {
conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
- SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());
+ SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName(), new CSVIntermediateDataFormat());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(dataFormat);