You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/14 23:03:55 UTC
[2/2] git commit: SQOOP-1585: Sqoop2: Prefix mapreduce classes with
MR ( no functionality change)
SQOOP-1585: Sqoop2: Prefix mapreduce classes with MR ( no functionality change)
(Veena Basavaraj via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cb821480
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cb821480
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cb821480
Branch: refs/heads/sqoop2
Commit: cb8214806b4e47dc3ad30d5bff0a42b04a412a06
Parents: 68577fb
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Oct 14 17:03:12 2014 -0400
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Oct 14 17:03:12 2014 -0400
----------------------------------------------------------------------
.../apache/sqoop/job/etl/ExtractorContext.java | 1 -
.../mapreduce/MapreduceExecutionEngine.java | 16 +-
.../java/org/apache/sqoop/job/JobConstants.java | 81 ------
.../org/apache/sqoop/job/MRExecutionError.java | 97 +++++++
.../org/apache/sqoop/job/MRJobConstants.java | 81 ++++++
.../sqoop/job/MapreduceExecutionError.java | 97 -------
.../main/java/org/apache/sqoop/job/io/Data.java | 26 +-
.../apache/sqoop/job/mr/ConfigurationUtils.java | 278 -------------------
.../sqoop/job/mr/MRConfigurationUtils.java | 278 +++++++++++++++++++
.../apache/sqoop/job/mr/ProgressRunnable.java | 45 ---
.../sqoop/job/mr/SqoopDestroyerExecutor.java | 16 +-
.../sqoop/job/mr/SqoopFileOutputFormat.java | 8 +-
.../apache/sqoop/job/mr/SqoopInputFormat.java | 18 +-
.../org/apache/sqoop/job/mr/SqoopMapper.java | 26 +-
.../sqoop/job/mr/SqoopNullOutputFormat.java | 2 +-
.../job/mr/SqoopOutputFormatLoadExecutor.java | 28 +-
.../sqoop/job/mr/SqoopProgressRunnable.java | 45 +++
.../org/apache/sqoop/job/mr/SqoopReducer.java | 4 +-
.../org/apache/sqoop/job/mr/SqoopSplit.java | 6 +-
.../org/apache/sqoop/job/TestMapReduce.java | 32 +--
.../java/org/apache/sqoop/job/TestMatching.java | 12 +-
.../apache/sqoop/job/io/SqoopWritableTest.java | 2 +-
.../sqoop/job/mr/TestConfigurationUtils.java | 168 -----------
.../sqoop/job/mr/TestMRConfigurationUtils.java | 168 +++++++++++
.../mr/TestSqoopOutputFormatLoadExecutor.java | 10 +-
.../mapreduce/MapreduceSubmissionEngine.java | 22 +-
26 files changed, 783 insertions(+), 784 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
index 3272b56..4875ed0 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
@@ -19,7 +19,6 @@ package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.etl.io.DataWriter;
-import org.apache.sqoop.schema.Schema;
/**
* Context implementation for Extractor.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 47f8478..9b3eb44 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.driver.ExecutionEngine;
import org.apache.sqoop.driver.JobRequest;
-import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To;
import org.apache.sqoop.job.io.SqoopWritable;
@@ -64,16 +64,16 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
From from = (From) mrJobRequest.getFrom();
To to = (To) mrJobRequest.getTo();
MutableMapContext context = mrJobRequest.getDriverContext();
- context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
- context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
- context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName());
- context.setString(JobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName());
- context.setString(JobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName());
- context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ context.setString(MRJobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
+ context.setString(MRJobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
+ context.setString(MRJobConstants.JOB_ETL_LOADER, to.getLoader().getName());
+ context.setString(MRJobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName());
+ context.setString(MRJobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName());
+ context.setString(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
mrJobRequest.getIntermediateDataFormat().getName());
if(mrJobRequest.getExtractors() != null) {
- context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
+ context.setInteger(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
deleted file mode 100644
index 349bb60..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import org.apache.sqoop.core.ConfigurationConstants;
-import org.apache.sqoop.driver.DriverConstants;
-
-public final class JobConstants extends Constants {
- /**
- * All job related configuration is prefixed with this:
- * <tt>org.apache.sqoop.job.</tt>
- */
- public static final String PREFIX_JOB_CONFIG =
- ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
-
- public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
- + "etl.partitioner";
-
- public static final String JOB_ETL_EXTRACTOR = PREFIX_JOB_CONFIG
- + "etl.extractor";
-
- public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
- + "etl.loader";
-
- public static final String JOB_ETL_FROM_DESTROYER = PREFIX_JOB_CONFIG
- + "etl.from.destroyer";
-
- public static final String JOB_ETL_TO_DESTROYER = PREFIX_JOB_CONFIG
- + "etl.to.destroyer";
-
- public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG
- + "mr.output.file";
-
- public static final String JOB_MR_OUTPUT_CODEC = PREFIX_JOB_CONFIG
- + "mr.output.codec";
-
-
- public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG
- + "etl.extractor.count";
-
- public static final String PREFIX_CONNECTOR_FROM_CONTEXT =
- PREFIX_JOB_CONFIG + "connector.from.context.";
-
- public static final String PREFIX_CONNECTOR_TO_CONTEXT =
- PREFIX_JOB_CONFIG + "connector.to.context.";
-
- // Hadoop specific constants
- // We're using constants from Hadoop 1. Hadoop 2 has different names, but
- // provides backward compatibility layer for those names as well.
-
- public static final String HADOOP_INPUTDIR = "mapred.input.dir";
-
- public static final String HADOOP_OUTDIR = "mapred.output.dir";
-
- public static final String HADOOP_COMPRESS = "mapred.output.compress";
-
- public static final String HADOOP_COMPRESS_CODEC =
- "mapred.output.compression.codec";
-
- public static final String INTERMEDIATE_DATA_FORMAT =
- DriverConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format";
-
- private JobConstants() {
- // Disable explicit object creation
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java
new file mode 100644
index 0000000..e70b7e2
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import org.apache.sqoop.common.ErrorCode;
+
+/**
+ *
+ */
+public enum MRExecutionError implements ErrorCode {
+
+ MAPRED_EXEC_0000("Unknown error"),
+
+ /** Error occurs during job execution. */
+ MAPRED_EXEC_0008("Error occurs during job execution"),
+
+ /** The system was unable to load the specified class. */
+ MAPRED_EXEC_0009("Unable to load the specified class"),
+
+ /** The system was unable to instantiate the specified class. */
+ MAPRED_EXEC_0010("Unable to instantiate the specified class"),
+
+ /** The parameter already exists in the context */
+ MAPRED_EXEC_0011("The parameter already exists in the context"),
+
+ /** The type is not supported */
+ MAPRED_EXEC_0012("The type is not supported"),
+
+ /** Cannot write to the data writer */
+ MAPRED_EXEC_0013("Cannot write to the data writer"),
+
+ /** Cannot read from the data reader */
+ MAPRED_EXEC_0014("Cannot read to the data reader"),
+
+ /** Unable to write data due to interrupt */
+ MAPRED_EXEC_0015("Unable to write data due to interrupt"),
+
+ /** Unable to read data due to interrupt */
+ MAPRED_EXEC_0016("Unable to read data due to interrupt"),
+
+ /** Error occurs during extractor run */
+ MAPRED_EXEC_0017("Error occurs during extractor run"),
+
+ /** Error occurs during loader run */
+ MAPRED_EXEC_0018("Error occurs during loader run"),
+
+ MAPRED_EXEC_0019("Data have not been completely consumed yet"),
+
+ /** The required option has not been set yet */
+ MAPRED_EXEC_0020("The required option has not been set yet"),
+
+ /** Error occurs during partitioner run */
+ MAPRED_EXEC_0021("Error occurs during partitioner run"),
+
+ /** Unable to parse because it is not properly delimited */
+ MAPRED_EXEC_0022("Unable to parse because it is not properly delimited"),
+
+ /** Unknown job type */
+ MAPRED_EXEC_0023("Unknown job type"),
+
+ /** Unsupported output format type found **/
+ MAPRED_EXEC_0024("Unknown output format type"),
+
+ /** Got invalid number of partitions from Partitioner */
+ MAPRED_EXEC_0025("Retrieved invalid number of partitions from Partitioner"),
+
+ ;
+
+ private final String message;
+
+ private MRExecutionError(String message) {
+ this.message = message;
+ }
+
+ public String getCode() {
+ return name();
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
new file mode 100644
index 0000000..67021a8
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import org.apache.sqoop.core.ConfigurationConstants;
+import org.apache.sqoop.driver.DriverConstants;
+
+public final class MRJobConstants extends Constants {
+ /**
+ * All job related configuration is prefixed with this:
+ * <tt>org.apache.sqoop.job.</tt>
+ */
+ public static final String PREFIX_JOB_CONFIG =
+ ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
+
+ public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
+ + "etl.partitioner";
+
+ public static final String JOB_ETL_EXTRACTOR = PREFIX_JOB_CONFIG
+ + "etl.extractor";
+
+ public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
+ + "etl.loader";
+
+ public static final String JOB_ETL_FROM_DESTROYER = PREFIX_JOB_CONFIG
+ + "etl.from.destroyer";
+
+ public static final String JOB_ETL_TO_DESTROYER = PREFIX_JOB_CONFIG
+ + "etl.to.destroyer";
+
+ public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG
+ + "mr.output.file";
+
+ public static final String JOB_MR_OUTPUT_CODEC = PREFIX_JOB_CONFIG
+ + "mr.output.codec";
+
+
+ public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG
+ + "etl.extractor.count";
+
+ public static final String PREFIX_CONNECTOR_FROM_CONTEXT =
+ PREFIX_JOB_CONFIG + "connector.from.context.";
+
+ public static final String PREFIX_CONNECTOR_TO_CONTEXT =
+ PREFIX_JOB_CONFIG + "connector.to.context.";
+
+ // Hadoop specific constants
+ // We're using constants from Hadoop 1. Hadoop 2 has different names, but
+ // provides backward compatibility layer for those names as well.
+
+ public static final String HADOOP_INPUTDIR = "mapred.input.dir";
+
+ public static final String HADOOP_OUTDIR = "mapred.output.dir";
+
+ public static final String HADOOP_COMPRESS = "mapred.output.compress";
+
+ public static final String HADOOP_COMPRESS_CODEC =
+ "mapred.output.compression.codec";
+
+ public static final String INTERMEDIATE_DATA_FORMAT =
+ DriverConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format";
+
+ private MRJobConstants() {
+ // Disable explicit object creation
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java
deleted file mode 100644
index 1dc12d1..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import org.apache.sqoop.common.ErrorCode;
-
-/**
- *
- */
-public enum MapreduceExecutionError implements ErrorCode {
-
- MAPRED_EXEC_0000("Unknown error"),
-
- /** Error occurs during job execution. */
- MAPRED_EXEC_0008("Error occurs during job execution"),
-
- /** The system was unable to load the specified class. */
- MAPRED_EXEC_0009("Unable to load the specified class"),
-
- /** The system was unable to instantiate the specified class. */
- MAPRED_EXEC_0010("Unable to instantiate the specified class"),
-
- /** The parameter already exists in the context */
- MAPRED_EXEC_0011("The parameter already exists in the context"),
-
- /** The type is not supported */
- MAPRED_EXEC_0012("The type is not supported"),
-
- /** Cannot write to the data writer */
- MAPRED_EXEC_0013("Cannot write to the data writer"),
-
- /** Cannot read from the data reader */
- MAPRED_EXEC_0014("Cannot read to the data reader"),
-
- /** Unable to write data due to interrupt */
- MAPRED_EXEC_0015("Unable to write data due to interrupt"),
-
- /** Unable to read data due to interrupt */
- MAPRED_EXEC_0016("Unable to read data due to interrupt"),
-
- /** Error occurs during extractor run */
- MAPRED_EXEC_0017("Error occurs during extractor run"),
-
- /** Error occurs during loader run */
- MAPRED_EXEC_0018("Error occurs during loader run"),
-
- MAPRED_EXEC_0019("Data have not been completely consumed yet"),
-
- /** The required option has not been set yet */
- MAPRED_EXEC_0020("The required option has not been set yet"),
-
- /** Error occurs during partitioner run */
- MAPRED_EXEC_0021("Error occurs during partitioner run"),
-
- /** Unable to parse because it is not properly delimited */
- MAPRED_EXEC_0022("Unable to parse because it is not properly delimited"),
-
- /** Unknown job type */
- MAPRED_EXEC_0023("Unknown job type"),
-
- /** Unsupported output format type found **/
- MAPRED_EXEC_0024("Unknown output format type"),
-
- /** Got invalid number of partitions from Partitioner */
- MAPRED_EXEC_0025("Retrieved invalid number of partitions from Partitioner"),
-
- ;
-
- private final String message;
-
- private MapreduceExecutionError(String message) {
- this.message = message;
- }
-
- public String getCode() {
- return name();
- }
-
- public String getMessage() {
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
index 5423b7b..139883e 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.MRExecutionError;
public class Data implements WritableComparable<Data> {
@@ -76,7 +76,7 @@ public class Data implements WritableComparable<Data> {
this.content = content;
break;
default:
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@@ -87,7 +87,7 @@ public class Data implements WritableComparable<Data> {
case ARRAY_RECORD:
return parse();
default:
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType));
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType));
}
}
@@ -141,7 +141,7 @@ public class Data implements WritableComparable<Data> {
}
return result;
default:
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@@ -156,7 +156,7 @@ public class Data implements WritableComparable<Data> {
readArray(in);
break;
default:
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@@ -171,7 +171,7 @@ public class Data implements WritableComparable<Data> {
writeArray(out);
break;
default:
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@@ -249,7 +249,7 @@ public class Data implements WritableComparable<Data> {
default:
throw new IOException(
- new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, Integer.toString(type))
+ new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type))
);
}
}
@@ -307,7 +307,7 @@ public class Data implements WritableComparable<Data> {
} else {
throw new IOException(
- new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012,
+ new SqoopException(MRExecutionError.MAPRED_EXEC_0012,
array[i].getClass().getName()
)
);
@@ -351,7 +351,7 @@ public class Data implements WritableComparable<Data> {
return sb.toString();
default:
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@@ -399,7 +399,7 @@ public class Data implements WritableComparable<Data> {
return (Object[])content;
default:
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@@ -418,7 +418,7 @@ public class Data implements WritableComparable<Data> {
case FieldTypes.UTF:
if (field.charAt(0) != stringDelimiter ||
field.charAt(field.length()-1) != stringDelimiter) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0022);
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
}
list.add(index, unescape(field.substring(1, field.length()-1)));
break;
@@ -426,7 +426,7 @@ public class Data implements WritableComparable<Data> {
case FieldTypes.BIN:
if (field.charAt(0) != '[' ||
field.charAt(field.length()-1) != ']') {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0022);
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
}
String[] splits =
field.substring(1, field.length()-1).split(String.valueOf(','));
@@ -474,7 +474,7 @@ public class Data implements WritableComparable<Data> {
break;
default:
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType));
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType));
}
return ++index;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
deleted file mode 100644
index 0fa07f7..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.json.util.SchemaSerialization;
-import org.apache.sqoop.model.ConfigUtils;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.utils.ClassUtils;
-import org.json.simple.JSONObject;
-import org.json.simple.JSONValue;
-
-import java.io.InputStream;
-import java.util.Properties;
-
-/**
- * Helper class to store and load various information in/from MapReduce configuration
- * object and JobConf object.
- */
-public final class ConfigurationUtils {
-
- private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.link";
-
- private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.link";
-
- private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job";
-
- private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job";
-
- private static final String MR_JOB_CONFIG_DRIVER_CONFIG_CLASS = JobConstants.PREFIX_JOB_CONFIG + "config.class.driver";
-
- private static final String MR_JOB_CONFIG_FROM_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.link";
-
- private static final Text MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_FROM_CONNECTOR_LINK);
-
- private static final String MR_JOB_CONFIG_TO_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.link";
-
- private static final Text MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_TO_CONNECTOR_LINK);
-
- private static final String MR_JOB_CONFIG_FROM_JOB_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job";
-
- private static final Text MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_FROM_JOB_CONFIG);
-
- private static final String MR_JOB_CONFIG_TO_JOB_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job";
-
- private static final Text MR_JOB_CONFIG_TO_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_TO_JOB_CONFIG);
-
- private static final String MR_JOB_CONFIG_DRIVER_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.driver";
-
- private static final Text MR_JOB_CONFIG_DRIVER_CONFIG_KEY = new Text(MR_JOB_CONFIG_DRIVER_CONFIG);
-
- private static final String SCHEMA_FROM = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from";
-
- private static final Text SCHEMA_FROM_KEY = new Text(SCHEMA_FROM);
-
- private static final String SCHEMA_TO = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to";
-
- private static final Text SCHEMA_TO_KEY = new Text(SCHEMA_TO);
-
-
- /**
- * Persist Connector configuration object for link.
- *
- * @param job MapReduce job object
- * @param obj Configuration object
- */
- public static void setConnectorLinkConfig(Direction type, Job job, Object obj) {
- switch (type) {
- case FROM:
- job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, obj.getClass().getName());
- job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes());
- break;
-
- case TO:
- job.getConfiguration().set(MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK, obj.getClass().getName());
- job.getCredentials().addSecretKey(MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes());
- break;
- }
- }
-
- /**
- * Persist Connector configuration objects for job.
- *
- * @param job MapReduce job object
- * @param obj Configuration object
- */
- public static void setConnectorJobConfig(Direction type, Job job, Object obj) {
- switch (type) {
- case FROM:
- job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, obj.getClass().getName());
- job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes());
- break;
-
- case TO:
- job.getConfiguration().set(MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, obj.getClass().getName());
- job.getCredentials().addSecretKey(MR_JOB_CONFIG_TO_JOB_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes());
- break;
- }
- }
-
-
- /**
- * Persist driver configuration object for job.
- *
- * @param job MapReduce job object
- * @param obj Configuration object
- */
- public static void setDriverConfig(Job job, Object obj) {
- job.getConfiguration().set(MR_JOB_CONFIG_DRIVER_CONFIG_CLASS, obj.getClass().getName());
- job.getCredentials().addSecretKey(MR_JOB_CONFIG_DRIVER_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes());
- }
-
- /**
- * Persist Connector generated schema.
- *
- * @param type Direction of schema we are persisting
- * @param job MapReduce Job object
- * @param schema Schema
- */
- public static void setConnectorSchema(Direction type, Job job, Schema schema) {
- if(schema != null) {
- String jsonSchema = SchemaSerialization.extractSchema(schema).toJSONString();
- switch (type) {
- case FROM:
- job.getCredentials().addSecretKey(SCHEMA_FROM_KEY,jsonSchema.getBytes());
- return;
- case TO:
- job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes());
- return;
- }
- }
- }
-
- /**
- * Retrieve Connector configuration object for connection.
- * @param configuration MapReduce configuration object
- * @return Configuration object
- */
- public static Object getConnectorConnectionConfig(Direction type, Configuration configuration) {
- switch (type) {
- case FROM:
- return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY);
-
- case TO:
- return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK, MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY);
- }
-
- return null;
- }
-
- /**
- * Retrieve Connector configuration object for job.
- *
- * @param configuration MapReduce configuration object
- * @return Configuration object
- */
- public static Object getConnectorJobConfig(Direction type, Configuration configuration) {
- switch (type) {
- case FROM:
- return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY);
-
- case TO:
- return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, MR_JOB_CONFIG_TO_JOB_CONFIG_KEY);
- }
-
- return null;
- }
-
- /**
- * Retrieve Framework configuration object for job.
- *
- * @param configuration MapReduce configuration object
- * @return Configuration object
- */
- public static Object getDriverConfig(Configuration configuration) {
- return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_DRIVER_CONFIG_CLASS, MR_JOB_CONFIG_DRIVER_CONFIG_KEY);
- }
-
-
-
- /**
- * Retrieve Connector generated schema.
- *
- * @param type The FROM or TO connector
- * @param configuration MapReduce configuration object
- */
- public static Schema getConnectorSchema(Direction type, Configuration configuration) {
- switch (type) {
- case FROM:
- return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_KEY));
-
- case TO:
- return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_KEY));
- }
-
- return null;
- }
-
- /**
- * Deserialize schema from JSON encoded bytes.
- *
- * This method is null safe.
- *
- * @param bytes
- * @return
- */
- private static Schema getSchemaFromBytes(byte[] bytes) {
- if(bytes == null) {
- return null;
- }
-
- JSONObject jsonSchema = (JSONObject) JSONValue.parse(new String(bytes));
- return SchemaSerialization.restoreSchema(jsonSchema);
- }
-
- /**
- * Load configuration instance serialized in Hadoop credentials cache.
- *
- * @param configuration JobConf object associated with the job
- * @param classProperty Property with stored configuration class name
- * @param valueProperty Property with stored JSON representation of the
- * configuration object
- * @return New instance with loaded data
- */
- private static Object loadConfiguration(JobConf configuration, String classProperty, Text valueProperty) {
- // Create new instance of configuration class
- Object object = ClassUtils.instantiate(configuration.get(classProperty));
- if(object == null) {
- return null;
- }
-
- String json = new String(configuration.getCredentials().getSecretKey(valueProperty));
-
- // Fill it with JSON data
- ConfigUtils.fillValues(json, object);
-
- // And give it back
- return object;
- }
-
- private ConfigurationUtils() {
- // Instantiation is prohibited
- }
-
- public static void configureLogging() {
- try {
- Properties props = new Properties();
- InputStream resourceAsStream =
- SqoopMapper.class.getResourceAsStream("/META-INF/log4j.properties");
- props.load(resourceAsStream);
- PropertyConfigurator.configure(props);
- } catch (Exception e) {
- System.err.println("Encountered exception while configuring logging " +
- "for sqoop: " + e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java
new file mode 100644
index 0000000..03a1dec
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.job.MRJobConstants;
+import org.apache.sqoop.json.util.SchemaSerialization;
+import org.apache.sqoop.model.ConfigUtils;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.utils.ClassUtils;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Helper class to store and load various information in/from MapReduce configuration
+ * object and JobConf object.
+ */
+public final class MRConfigurationUtils {
+
+ private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.link";
+
+ private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.link";
+
+ private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job";
+
+ private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job";
+
+ private static final String MR_JOB_CONFIG_DRIVER_CONFIG_CLASS = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.driver";
+
+ private static final String MR_JOB_CONFIG_FROM_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.from.link";
+
+ private static final Text MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_FROM_CONNECTOR_LINK);
+
+ private static final String MR_JOB_CONFIG_TO_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.to.link";
+
+ private static final Text MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_TO_CONNECTOR_LINK);
+
+ private static final String MR_JOB_CONFIG_FROM_JOB_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job";
+
+ private static final Text MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_FROM_JOB_CONFIG);
+
+ private static final String MR_JOB_CONFIG_TO_JOB_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job";
+
+ private static final Text MR_JOB_CONFIG_TO_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_TO_JOB_CONFIG);
+
+ private static final String MR_JOB_CONFIG_DRIVER_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.driver";
+
+ private static final Text MR_JOB_CONFIG_DRIVER_CONFIG_KEY = new Text(MR_JOB_CONFIG_DRIVER_CONFIG);
+
+ private static final String SCHEMA_FROM = MRJobConstants.PREFIX_JOB_CONFIG + "schema.connector.from";
+
+ private static final Text SCHEMA_FROM_KEY = new Text(SCHEMA_FROM);
+
+ private static final String SCHEMA_TO = MRJobConstants.PREFIX_JOB_CONFIG + "schema.connector.to";
+
+ private static final Text SCHEMA_TO_KEY = new Text(SCHEMA_TO);
+
+
+ /**
+ * Persist Connector configuration object for link.
+ *
+ * @param job MapReduce job object
+ * @param obj Configuration object
+ */
+ public static void setConnectorLinkConfig(Direction type, Job job, Object obj) {
+ switch (type) {
+ case FROM:
+ job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, obj.getClass().getName());
+ job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes());
+ break;
+
+ case TO:
+ job.getConfiguration().set(MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK, obj.getClass().getName());
+ job.getCredentials().addSecretKey(MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes());
+ break;
+ }
+ }
+
+ /**
+ * Persist Connector configuration objects for job.
+ *
+ * @param job MapReduce job object
+ * @param obj Configuration object
+ */
+ public static void setConnectorJobConfig(Direction type, Job job, Object obj) {
+ switch (type) {
+ case FROM:
+ job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, obj.getClass().getName());
+ job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes());
+ break;
+
+ case TO:
+ job.getConfiguration().set(MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, obj.getClass().getName());
+ job.getCredentials().addSecretKey(MR_JOB_CONFIG_TO_JOB_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes());
+ break;
+ }
+ }
+
+
+ /**
+ * Persist driver configuration object for job.
+ *
+ * @param job MapReduce job object
+ * @param obj Configuration object
+ */
+ public static void setDriverConfig(Job job, Object obj) {
+ job.getConfiguration().set(MR_JOB_CONFIG_DRIVER_CONFIG_CLASS, obj.getClass().getName());
+ job.getCredentials().addSecretKey(MR_JOB_CONFIG_DRIVER_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes());
+ }
+
+ /**
+ * Persist Connector generated schema.
+ *
+ * @param type Direction of schema we are persisting
+ * @param job MapReduce Job object
+ * @param schema Schema
+ */
+ public static void setConnectorSchema(Direction type, Job job, Schema schema) {
+ if(schema != null) {
+ String jsonSchema = SchemaSerialization.extractSchema(schema).toJSONString();
+ switch (type) {
+ case FROM:
+ job.getCredentials().addSecretKey(SCHEMA_FROM_KEY,jsonSchema.getBytes());
+ return;
+ case TO:
+ job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes());
+ return;
+ }
+ }
+ }
+
+ /**
+ * Retrieve Connector configuration object for connection.
+ * @param configuration MapReduce configuration object
+ * @return Configuration object
+ */
+ public static Object getConnectorConnectionConfig(Direction type, Configuration configuration) {
+ switch (type) {
+ case FROM:
+ return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY);
+
+ case TO:
+ return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK, MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY);
+ }
+
+ return null;
+ }
+
+ /**
+ * Retrieve Connector configuration object for job.
+ *
+ * @param configuration MapReduce configuration object
+ * @return Configuration object
+ */
+ public static Object getConnectorJobConfig(Direction type, Configuration configuration) {
+ switch (type) {
+ case FROM:
+ return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY);
+
+ case TO:
+ return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, MR_JOB_CONFIG_TO_JOB_CONFIG_KEY);
+ }
+
+ return null;
+ }
+
+ /**
+ * Retrieve Framework configuration object for job.
+ *
+ * @param configuration MapReduce configuration object
+ * @return Configuration object
+ */
+ public static Object getDriverConfig(Configuration configuration) {
+ return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_DRIVER_CONFIG_CLASS, MR_JOB_CONFIG_DRIVER_CONFIG_KEY);
+ }
+
+
+
+ /**
+ * Retrieve Connector generated schema.
+ *
+ * @param type The FROM or TO connector
+ * @param configuration MapReduce configuration object
+ */
+ public static Schema getConnectorSchema(Direction type, Configuration configuration) {
+ switch (type) {
+ case FROM:
+ return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_KEY));
+
+ case TO:
+ return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_KEY));
+ }
+
+ return null;
+ }
+
+ /**
+ * Deserialize schema from JSON encoded bytes.
+ *
+ * This method is null safe.
+ *
+ * @param bytes
+ * @return
+ */
+ private static Schema getSchemaFromBytes(byte[] bytes) {
+ if(bytes == null) {
+ return null;
+ }
+
+ JSONObject jsonSchema = (JSONObject) JSONValue.parse(new String(bytes));
+ return SchemaSerialization.restoreSchema(jsonSchema);
+ }
+
+ /**
+ * Load configuration instance serialized in Hadoop credentials cache.
+ *
+ * @param configuration JobConf object associated with the job
+ * @param classProperty Property with stored configuration class name
+ * @param valueProperty Property with stored JSON representation of the
+ * configuration object
+ * @return New instance with loaded data
+ */
+ private static Object loadConfiguration(JobConf configuration, String classProperty, Text valueProperty) {
+ // Create new instance of configuration class
+ Object object = ClassUtils.instantiate(configuration.get(classProperty));
+ if(object == null) {
+ return null;
+ }
+
+ String json = new String(configuration.getCredentials().getSecretKey(valueProperty));
+
+ // Fill it with JSON data
+ ConfigUtils.fillValues(json, object);
+
+ // And give it back
+ return object;
+ }
+
+ private MRConfigurationUtils() {
+ // Instantiation is prohibited
+ }
+
+ public static void configureLogging() {
+ try {
+ Properties props = new Properties();
+ InputStream resourceAsStream =
+ SqoopMapper.class.getResourceAsStream("/META-INF/log4j.properties");
+ props.load(resourceAsStream);
+ PropertyConfigurator.configure(props);
+ } catch (Exception e) {
+ System.err.println("Encountered exception while configuring logging " +
+ "for sqoop: " + e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
deleted file mode 100644
index 4c2e206..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.log4j.Logger;
-
-
-/**
- * Runnable that will ping mapreduce context about progress.
- */
-public class ProgressRunnable implements Runnable {
-
- public static final Logger LOG = Logger.getLogger(ProgressRunnable.class);
-
- /**
- * Context class that we should use for reporting progress.
- */
- private final TaskInputOutputContext<?,?,?,?> context;
-
- public ProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctxt) {
- this.context = ctxt;
- }
-
- @Override
- public void run() {
- LOG.debug("Auto-progress thread reporting progress");
- this.context.progress();
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index 8d2a1da..b385926 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -20,7 +20,7 @@ package org.apache.sqoop.job.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
@@ -47,13 +47,13 @@ public class SqoopDestroyerExecutor {
switch (direction) {
default:
case FROM:
- destroyerPropertyName = JobConstants.JOB_ETL_FROM_DESTROYER;
- prefixPropertyName = JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT;
+ destroyerPropertyName = MRJobConstants.JOB_ETL_FROM_DESTROYER;
+ prefixPropertyName = MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT;
break;
case TO:
- destroyerPropertyName = JobConstants.JOB_ETL_TO_DESTROYER;
- prefixPropertyName = JobConstants.PREFIX_CONNECTOR_TO_CONTEXT;
+ destroyerPropertyName = MRJobConstants.JOB_ETL_TO_DESTROYER;
+ prefixPropertyName = MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT;
break;
}
@@ -66,11 +66,11 @@ public class SqoopDestroyerExecutor {
// Objects that should be pass to the Destroyer execution
PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName);
- Object configConnection = ConfigurationUtils.getConnectorConnectionConfig(direction, configuration);
- Object configJob = ConfigurationUtils.getConnectorJobConfig(direction, configuration);
+ Object configConnection = MRConfigurationUtils.getConnectorConnectionConfig(direction, configuration);
+ Object configJob = MRConfigurationUtils.getConnectorJobConfig(direction, configuration);
// Propagate connector schema in every case for now
- Schema schema = ConfigurationUtils.getConnectorSchema(direction, configuration);
+ Schema schema = MRConfigurationUtils.getConnectorSchema(direction, configuration);
DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
index ca77e16..f451044 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.io.SqoopWritable;
/**
@@ -56,13 +56,13 @@ public class SqoopFileOutputFormat
Path filepath = getDefaultWorkFile(context, "");
String filename = filepath.toString();
- conf.set(JobConstants.JOB_MR_OUTPUT_FILE, filename);
+ conf.set(MRJobConstants.JOB_MR_OUTPUT_FILE, filename);
boolean isCompressed = getCompressOutput(context);
if (isCompressed) {
String codecname =
- conf.get(JobConstants.HADOOP_COMPRESS_CODEC, DEFAULT_CODEC.getName());
- conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname);
+ conf.get(MRJobConstants.HADOOP_COMPRESS_CODEC, DEFAULT_CODEC.getName());
+ conf.set(MRJobConstants.JOB_MR_OUTPUT_CODEC, codecname);
}
return new SqoopOutputFormatLoadExecutor(context).getRecordWriter();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index 1c1133a..d2cf5e4 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.MRJobConstants;
+import org.apache.sqoop.job.MRExecutionError;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
@@ -59,15 +59,15 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
- String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER);
+ String partitionerName = conf.get(MRJobConstants.JOB_ETL_PARTITIONER);
Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
- PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
- Object connectorConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
- Object connectorJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
- Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
+ PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
+ Object connectorConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
+ Object connectorJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
+ Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
- long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
+ long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorConnection, connectorJob);
@@ -80,7 +80,7 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
}
if(splits.size() > maxPartitions) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0025,
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0025,
String.format("Got %d, max was %d", splits.size(), maxPartitions));
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 03d84d4..d31aa20 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -31,8 +31,8 @@ import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.matcher.Matcher;
import org.apache.sqoop.connector.matcher.MatcherFactory;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.MRJobConstants;
+import org.apache.sqoop.job.MRExecutionError;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
@@ -47,7 +47,7 @@ import org.apache.sqoop.utils.ClassUtils;
public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable> {
static {
- ConfigurationUtils.configureLogging();
+ MRConfigurationUtils.configureLogging();
}
public static final Logger LOG = Logger.getLogger(SqoopMapper.class);
@@ -63,14 +63,14 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
public void run(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
- String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
+ String extractorName = conf.get(MRJobConstants.JOB_ETL_EXTRACTOR);
Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
matcher = MatcherFactory.getMatcher(
- ConfigurationUtils.getConnectorSchema(Direction.FROM, conf),
- ConfigurationUtils.getConnectorSchema(Direction.TO, conf));
+ MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf),
+ MRConfigurationUtils.getConnectorSchema(Direction.TO, conf));
- String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT);
+ String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT);
fromDataFormat = (IntermediateDataFormat<String>) ClassUtils
.instantiate(intermediateDataFormatName);
fromDataFormat.setSchema(matcher.getFromSchema());
@@ -79,16 +79,16 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
toDataFormat.setSchema(matcher.getToSchema());
// Objects that should be passed to the Executor execution
- PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
- Object fromConfig = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
- Object fromJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
+ PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
+ Object fromConfig = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
+ Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
SqoopSplit split = context.getCurrentKey();
ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context));
try {
LOG.info("Starting progress service");
- progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
+ progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
LOG.info("Running extractor class " + extractorName);
extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition());
@@ -96,7 +96,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
context.getCounter(SqoopCounters.ROWS_READ)
.increment(extractor.getRowsRead());
} catch (Exception e) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0017, e);
} finally {
LOG.info("Stopping progress service");
progressService.shutdown();
@@ -145,7 +145,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
writable.setString(toDataFormat.getTextData());
context.write(writable, NullWritable.get());
} catch (Exception e) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e);
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
index 594b5e9..1148c4a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.io.SqoopWritable;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 1ebd3e4..9108981 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -36,8 +36,8 @@ import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.matcher.Matcher;
import org.apache.sqoop.connector.matcher.MatcherFactory;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.MRJobConstants;
+import org.apache.sqoop.job.MRExecutionError;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
@@ -75,10 +75,10 @@ public class SqoopOutputFormatLoadExecutor {
context = jobctx;
writer = new SqoopRecordWriter();
matcher = MatcherFactory.getMatcher(
- ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()),
- ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()));
+ MRConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()),
+ MRConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()));
dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
- .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
+ .getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT));
dataFormat.setSchema(matcher.getToSchema());
}
@@ -141,7 +141,7 @@ public class SqoopOutputFormatLoadExecutor {
//In the rare case, it was not a SqoopException
Throwables.propagate(t);
} catch (Exception ex) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019, ex);
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex);
}
}
@@ -186,7 +186,7 @@ public class SqoopOutputFormatLoadExecutor {
} catch (Throwable t) {
readerFinished = true;
LOG.error("Caught exception e while getting content ", t);
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t);
} finally {
releaseSema();
}
@@ -221,7 +221,7 @@ public class SqoopOutputFormatLoadExecutor {
Configuration conf = null;
if (!isTest) {
conf = context.getConfiguration();
- loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
+ loaderName = conf.get(MRJobConstants.JOB_ETL_LOADER);
}
Loader loader = (Loader) ClassUtils.instantiate(loaderName);
@@ -233,11 +233,11 @@ public class SqoopOutputFormatLoadExecutor {
if (!isTest) {
// Using the TO schema since the IDF returns data in TO schema
- schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
+ schema = MRConfigurationUtils.getConnectorSchema(Direction.TO, conf);
- subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
- configConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf);
- configJob = ConfigurationUtils.getConnectorJobConfig(Direction.TO, conf);
+ subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
+ configConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf);
+ configJob = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf);
}
// Create loader context
@@ -252,7 +252,7 @@ public class SqoopOutputFormatLoadExecutor {
// Release so that the writer can tell Sqoop something went
// wrong.
free.release();
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t);
}
// if no exception happens yet and reader finished before writer,
@@ -264,7 +264,7 @@ public class SqoopOutputFormatLoadExecutor {
// Release so that the writer can tell Sqoop something went
// wrong.
free.release();
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019);
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019);
}
// inform writer that reader is finished
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java
new file mode 100644
index 0000000..cd4f8b9
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Runnable that will ping mapreduce context about progress.
+ */
+public class SqoopProgressRunnable implements Runnable {
+
+ public static final Logger LOG = Logger.getLogger(SqoopProgressRunnable.class);
+
+ /**
+ * Context class that we should use for reporting progress.
+ */
+ private final TaskInputOutputContext<?,?,?,?> context;
+
+ public SqoopProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctx) {
+ this.context = ctx;
+ }
+
+ @Override
+ public void run() {
+ LOG.debug("Auto-progress thread reporting progress");
+ this.context.progress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
index a55534a..cf023c3 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWritable, NullWritable> {
static {
- ConfigurationUtils.configureLogging();
+ MRConfigurationUtils.configureLogging();
}
public static final Logger LOG = Logger.getLogger(SqoopReducer.class);
@@ -46,7 +46,7 @@ public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWrit
public void run(Context context) throws IOException, InterruptedException {
try {
LOG.info("Starting progress service");
- progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
+ progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
// Delegating all functionality to our parent
super.run(context);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
index dca4c90..c2f5756 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
@@ -24,7 +24,7 @@ import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.MRExecutionError;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.utils.ClassUtils;
@@ -60,12 +60,12 @@ public class SqoopSplit extends InputSplit implements Writable {
// instantiate Partition object
Class<?> clz = ClassUtils.loadClass(className);
if (clz == null) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, className);
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0009, className);
}
try {
partition = (Partition) clz.newInstance();
} catch (Exception e) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, className, e);
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0010, className, e);
}
// read Partition object content
partition.readFields(in);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index e3b68e2..6d0dcb4 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -45,7 +45,7 @@ import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.SqoopWritable;
-import org.apache.sqoop.job.mr.ConfigurationUtils;
+import org.apache.sqoop.job.mr.MRConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
@@ -68,8 +68,8 @@ public class TestMapReduce {
@Test
public void testInputFormat() throws Exception {
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
- conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Job job = new Job(conf);
@@ -87,17 +87,17 @@ public class TestMapReduce {
@Test
public void testMapper() throws Exception {
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
- conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
- conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
Job job = new Job(conf);
- ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
- ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
+ MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
+ MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
boolean success = JobUtils.runJob(job.getConfiguration(),
SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class);
Assert.assertEquals("Job failed!", true, success);
@@ -106,20 +106,20 @@ public class TestMapReduce {
@Test
public void testOutputFormat() throws Exception {
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
- conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
- conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
- conf.set(JobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
- conf.set(JobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
- conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+ conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+ conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
+ conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
+ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new Text("3"));
Job job = new Job(conf);
- ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
- ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
+ MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
+ MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
boolean success = JobUtils.runJob(job.getConfiguration(),
SqoopInputFormat.class, SqoopMapper.class,
SqoopNullOutputFormat.class);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
index 7f9a147..665a65b 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
@@ -42,7 +42,7 @@ import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.SqoopWritable;
-import org.apache.sqoop.job.mr.ConfigurationUtils;
+import org.apache.sqoop.job.mr.MRConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.schema.Schema;
@@ -123,14 +123,14 @@ public class TestMatching {
@Test
public void testSchemaMatching() throws Exception {
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
- conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
- conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Job job = new Job(conf);
- ConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
- ConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
+ MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
+ MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
DummyOutputFormat.class);
boolean success = JobUtils.runJob(job.getConfiguration(),
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
index f5742a2..68ce5ed 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
@@ -31,7 +31,7 @@ import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
-import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.MRJobConstants;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
deleted file mode 100644
index 501e32c..0000000
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Config;
-import org.apache.sqoop.model.ConfigClass;
-import org.apache.sqoop.model.Input;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.Text;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Current tests are using mockito to propagate credentials from hadoop Job object
- * to hadoop JobConf object. This implementation was chosen because it's not clear
- * how MapReduce is converting one object to another.
- */
-public class TestConfigurationUtils {
-
- Job job;
- JobConf jobConfSpy;
-
- @Before
- public void setUp() throws Exception {
- setUpHadoopJob();
- setUpHadoopJobConf();
- }
-
- public void setUpHadoopJob() throws Exception {
- job = new Job();
- }
-
- public void setUpHadoopJobConf() throws Exception {
- jobConfSpy = spy(new JobConf(job.getConfiguration()));
- when(jobConfSpy.getCredentials()).thenReturn(job.getCredentials());
- }
-
- @Test
- public void testLinkConfiguration() throws Exception {
- ConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig());
- setUpHadoopJobConf();
- assertEquals(getConfig(), ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConfSpy));
-
- ConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig());
- setUpHadoopJobConf();
- assertEquals(getConfig(), ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConfSpy));
- }
-
- @Test
- public void testJobConfiguration() throws Exception {
- ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, getConfig());
- setUpHadoopJobConf();
- assertEquals(getConfig(), ConfigurationUtils.getConnectorJobConfig(Direction.FROM, jobConfSpy));
-
- ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, getConfig());
- setUpHadoopJobConf();
- assertEquals(getConfig(), ConfigurationUtils.getConnectorJobConfig(Direction.TO, jobConfSpy));
- }
-
- @Test
- public void testDriverConfiguration() throws Exception {
- ConfigurationUtils.setDriverConfig(job, getConfig());
- setUpHadoopJobConf();
- assertEquals(getConfig(), ConfigurationUtils.getDriverConfig(jobConfSpy));
- }
-
- @Test
- public void testConnectorSchema() throws Exception {
- ConfigurationUtils.setConnectorSchema(Direction.FROM, job, getSchema("a"));
- assertEquals(getSchema("a"), ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
-
- ConfigurationUtils.setConnectorSchema(Direction.TO, job, getSchema("b"));
- assertEquals(getSchema("b"), ConfigurationUtils.getConnectorSchema(Direction.TO, jobConfSpy));
- }
-
- @Test
- public void testConnectorSchemaNull() throws Exception {
- ConfigurationUtils.setConnectorSchema(Direction.FROM, job, null);
- assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
-
- ConfigurationUtils.setConnectorSchema(Direction.TO, job, null);
- assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
- }
-
- private Schema getSchema(String name) {
- return new Schema(name).addColumn(new Text("c1"));
- }
-
- private TestConfiguration getConfig() {
- TestConfiguration c = new TestConfiguration();
- c.c.A = "This is secret text!";
- return c;
- }
-
- @ConfigClass
- public static class C {
-
- @Input String A;
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof C)) return false;
-
- C c = (C) o;
-
- if (A != null ? !A.equals(c.A) : c.A != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return A != null ? A.hashCode() : 0;
- }
- }
-
- @ConfigurationClass
- public static class TestConfiguration {
- @Config C c;
-
- public TestConfiguration() {
- c = new C();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof TestConfiguration)) return false;
-
- TestConfiguration config = (TestConfiguration) o;
-
- if (c != null ? !c.equals(config.c) : config.c != null)
- return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return c != null ? c.hashCode() : 0;
- }
- }
-}