You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/10/19 22:35:48 UTC
git commit: SQOOP-610: Job submission engine for import
Updated Branches:
refs/heads/sqoop2 6cd9e9688 -> 7fbd3ba94
SQOOP-610: Job submission engine for import
(Bilung Lee via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/7fbd3ba9
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/7fbd3ba9
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/7fbd3ba9
Branch: refs/heads/sqoop2
Commit: 7fbd3ba94b970ca4c2053d3930dbf6d84989213e
Parents: 6cd9e96
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Oct 19 13:28:18 2012 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Oct 19 13:28:18 2012 -0700
----------------------------------------------------------------------
.../java/org/apache/sqoop/job/JobConstants.java | 10 +
.../main/java/org/apache/sqoop/job/JobEngine.java | 37 +++
.../org/apache/sqoop/job/etl/EtlFramework.java | 148 +++++++++++
.../java/org/apache/sqoop/job/etl/EtlOptions.java | 165 ++++++++++++
.../java/org/apache/sqoop/job/mr/MrExecution.java | 153 +++++++++++
.../java/org/apache/sqoop/job/mr/SqoopReducer.java | 35 +++
.../java/org/apache/sqoop/job/TestJobEngine.java | 199 +++++++++++++++
7 files changed, 747 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/JobConstants.java b/core/src/main/java/org/apache/sqoop/job/JobConstants.java
index a032c72..2b0ec18 100644
--- a/core/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/core/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -21,6 +21,16 @@ import org.apache.sqoop.core.ConfigurationConstants;
public final class JobConstants extends Constants {
+ // Metadata constants
+
+ public static final String INPUT_JOB_JOB_TYPE = "inp-job-job-type";
+ public static final String INPUT_JOB_STORAGE_TYPE = "inp-job-storage-type";
+ public static final String INPUT_JOB_FORMAT_TYPE = "inp-job-format-type";
+ public static final String INPUT_JOB_OUTPUT_CODEC = "inp-job-output-codec";
+ public static final String INPUT_JOB_MAX_EXTRACTORS = "inp-job-max-extractors";
+ public static final String INPUT_JOB_MAX_LOADERS = "inp-job-max-loaders";
+
+
/**
* All job related configuration is prefixed with this:
* <tt>org.apache.sqoop.job.</tt>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/main/java/org/apache/sqoop/job/JobEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/JobEngine.java b/core/src/main/java/org/apache/sqoop/job/JobEngine.java
new file mode 100644
index 0000000..fa3e484
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/job/JobEngine.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import org.apache.sqoop.job.etl.EtlFramework;
+import org.apache.sqoop.job.etl.EtlOptions;
+import org.apache.sqoop.job.mr.MrExecution;
+
+/**
+ * This class supports Sqoop job execution.
+ */
+public class JobEngine {
+
+ public void run(EtlOptions options) {
+ EtlFramework etl = new EtlFramework(options);
+ MrExecution mr = new MrExecution(etl);
+ mr.initialize();
+ mr.run();
+ mr.destroy();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/main/java/org/apache/sqoop/job/etl/EtlFramework.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/EtlFramework.java b/core/src/main/java/org/apache/sqoop/job/etl/EtlFramework.java
new file mode 100644
index 0000000..ce7f988
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/job/etl/EtlFramework.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.Exporter;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.EtlOptions.FormatType;
+import org.apache.sqoop.job.etl.EtlOptions.JobType;
+import org.apache.sqoop.job.etl.EtlOptions.StorageType;
+
+/**
+ * This class encapsulates the whole ETL framework.
+ *
+ * For import:
+ * Initializer (connector-defined)
+ * -> Partitioner (connector-defined)
+ * -> Extractor (connector-defined)
+ * -> Loader (framework-defined)
+ * -> Destroyer (connector-defined)
+ *
+ * For export:
+ * Initializer (connector-defined)
+ * -> Partitioner (framework-defined)
+ * -> Extractor (framework-defined)
+ * -> Loader (connector-defined)
+ * -> Destroyer (connector-defined)
+ */
+public class EtlFramework {
+
+ private Class<? extends Initializer> initializer;
+ private Class<? extends Partitioner> partitioner;
+ private Class<? extends Extractor> extractor;
+ private Class<? extends Loader> loader;
+ private Class<? extends Destroyer> destroyer;
+
+ private boolean requireFieldNames;
+ private boolean requireOutputDirectory;
+
+ private EtlOptions options;
+
+ public EtlFramework(EtlOptions inputs) {
+ this.options = inputs;
+ JobType jobType = options.getJobType();
+ switch (jobType) {
+ case IMPORT:
+ constructImport();
+ break;
+ case EXPORT:
+ constructExport();
+ break;
+ default:
+ throw new SqoopException(CoreError.CORE_0012, jobType.toString());
+ }
+ }
+
+ public EtlOptions getOptions() {
+ return options;
+ }
+
+ public Class<? extends Initializer> getInitializer() {
+ return initializer;
+ }
+
+ public Class<? extends Partitioner> getPartitioner() {
+ return partitioner;
+ }
+
+ public Class<? extends Extractor> getExtractor() {
+ return extractor;
+ }
+
+ public Class<? extends Loader> getLoader() {
+ return loader;
+ }
+
+ public Class<? extends Destroyer> getDestroyer() {
+ return destroyer;
+ }
+
+ public boolean isFieldNamesRequired() {
+ return requireFieldNames;
+ }
+
+ public boolean isOutputDirectoryRequired() {
+ return requireOutputDirectory;
+ }
+
+ private void constructImport() {
+ Importer importer = options.getConnector().getImporter();
+ initializer = importer.getInitializer();
+ partitioner = importer.getPartitioner();
+ extractor = importer.getExtractor();
+ destroyer = importer.getDestroyer();
+
+ StorageType storageType = options.getStorageType();
+ switch (storageType) {
+ case HDFS:
+ FormatType formatType = options.getFormatType();
+ switch (formatType) {
+ case TEXT:
+ loader = HdfsTextImportLoader.class;
+ requireOutputDirectory = true;
+ break;
+ case SEQUENCE:
+ loader = HdfsSequenceImportLoader.class;
+ requireOutputDirectory = true;
+ break;
+ default:
+ throw new SqoopException(CoreError.CORE_0012, formatType.toString());
+ }
+ break;
+ default:
+ throw new SqoopException(CoreError.CORE_0012, storageType.toString());
+ }
+ }
+
+ private void constructExport() {
+ Exporter exporter = options.getConnector().getExporter();
+ initializer = exporter.getInitializer();
+ loader = exporter.getLoader();
+ destroyer = exporter.getDestroyer();
+
+ // FIXME: set partitioner/extractor based on storage/format types
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/main/java/org/apache/sqoop/job/etl/EtlOptions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/EtlOptions.java b/core/src/main/java/org/apache/sqoop/job/etl/EtlOptions.java
new file mode 100644
index 0000000..e45c0ff
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/job/etl/EtlOptions.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import java.util.HashMap;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+
+/**
+ * This class retrieves information for job execution from user-input options.
+ */
+public class EtlOptions implements Options {
+
+ HashMap<String, String> store = new HashMap<String, String>();
+
+ public EtlOptions(SqoopConnector connector) {
+ this.connector = connector;
+ }
+
+ private SqoopConnector connector;
+ public SqoopConnector getConnector() {
+ return connector;
+ }
+
+ private JobType jobType = null;
+ public enum JobType {
+ IMPORT,
+ EXPORT
+ }
+ public JobType getJobType() {
+ if (jobType != null) {
+ return jobType;
+ }
+
+ String option = store.get(JobConstants.INPUT_JOB_JOB_TYPE);
+ if (option == null || option.equalsIgnoreCase("IMPORT")) {
+ jobType = JobType.IMPORT;
+ } else if (option.equalsIgnoreCase("EXPORT")) {
+ jobType = JobType.EXPORT;
+ } else {
+ throw new SqoopException(CoreError.CORE_0012, option);
+ }
+ return jobType;
+ }
+
+ private StorageType storageType = null;
+ public enum StorageType {
+ HDFS
+ }
+ public StorageType getStorageType() {
+ if (storageType != null) {
+ return storageType;
+ }
+
+ String option = store.get(JobConstants.INPUT_JOB_STORAGE_TYPE);
+ if (option == null || option.equalsIgnoreCase("HDFS")) {
+ storageType = StorageType.HDFS;
+ } else {
+ throw new SqoopException(CoreError.CORE_0012, option);
+ }
+ return storageType;
+ }
+
+ private FormatType formatType = null;
+ public enum FormatType {
+ TEXT,
+ SEQUENCE
+ }
+ public FormatType getFormatType() {
+ if (formatType != null) {
+ return formatType;
+ }
+
+ String option = store.get(JobConstants.INPUT_JOB_FORMAT_TYPE);
+ if (option == null || option.equalsIgnoreCase("TEXT")) {
+ formatType = FormatType.TEXT;
+ } else if (option.equalsIgnoreCase("SEQUENCE")) {
+ formatType = FormatType.SEQUENCE;
+ } else {
+ throw new SqoopException(CoreError.CORE_0012, option);
+ }
+ return formatType;
+ }
+
+ public String getOutputCodec() {
+ return store.get(JobConstants.INPUT_JOB_OUTPUT_CODEC);
+ }
+
+ private int maxExtractors = -1;
+ public int getMaxExtractors() {
+ if (maxExtractors != -1) {
+ return maxExtractors;
+ }
+
+ String option = store.get(JobConstants.INPUT_JOB_MAX_EXTRACTORS);
+ if (option != null) {
+ maxExtractors = Integer.parseInt(option);
+ } else {
+ JobType type = getJobType();
+ switch (type) {
+ case IMPORT:
+ maxExtractors = 4;
+ break;
+ case EXPORT:
+ maxExtractors = 1;
+ break;
+ default:
+ throw new SqoopException(CoreError.CORE_0012, type.toString());
+ }
+ }
+ return maxExtractors;
+ }
+
+ private int maxLoaders = -1;
+ public int getMaxLoaders() {
+ if (maxLoaders != -1) {
+ return maxLoaders;
+ }
+
+ String option = store.get(JobConstants.INPUT_JOB_MAX_LOADERS);
+ if (option != null) {
+ maxLoaders = Integer.parseInt(option);
+ } else {
+ JobType type = getJobType();
+ switch (type) {
+ case IMPORT:
+ maxLoaders = 1;
+ break;
+ case EXPORT:
+ maxLoaders = 4;
+ break;
+ default:
+ throw new SqoopException(CoreError.CORE_0012, type.toString());
+ }
+ }
+ return maxLoaders;
+ }
+
+ public void setOption(String key, String value) {
+ store.put(key, value);
+ }
+
+ @Override
+ public String getOption(String key) {
+ return store.get(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/main/java/org/apache/sqoop/job/mr/MrExecution.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/MrExecution.java b/core/src/main/java/org/apache/sqoop/job/mr/MrExecution.java
new file mode 100644
index 0000000..bd4c108
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/job/mr/MrExecution.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.EtlContext;
+import org.apache.sqoop.job.etl.EtlFramework;
+import org.apache.sqoop.job.etl.EtlMutableContext;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.EtlOptions;
+import org.apache.sqoop.job.etl.EtlOptions.JobType;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * This class encapsulates the whole MapReduce execution.
+ */
+public class MrExecution {
+
+ private Configuration conf;
+ private EtlFramework etl;
+
+ public MrExecution(EtlFramework etl) {
+ this.conf = new Configuration();
+ this.etl = etl;
+ }
+
+ public void initialize() {
+ EtlOptions options = etl.getOptions();
+
+ conf.setInt(JobConstants.JOB_ETL_NUMBER_PARTITIONS,
+ options.getMaxExtractors());
+
+ if (options.getOutputCodec() != null) {
+ conf.setBoolean(FileOutputFormat.COMPRESS, true);
+ conf.set(FileOutputFormat.COMPRESS_CODEC, options.getOutputCodec());
+ }
+
+ conf.set(JobConstants.JOB_ETL_PARTITIONER, etl.getPartitioner().getName());
+ conf.set(JobConstants.JOB_ETL_EXTRACTOR, etl.getExtractor().getName());
+ conf.set(JobConstants.JOB_ETL_LOADER, etl.getLoader().getName());
+
+ EtlMutableContext context = new EtlMutableContext(conf);
+
+ Class<? extends Initializer> initializer = etl.getInitializer();
+ if (initializer != null) {
+ Initializer instance;
+ try {
+ instance = (Initializer) initializer.newInstance();
+ } catch (Exception e) {
+ throw new SqoopException(CoreError.CORE_0010, initializer.getName(), e);
+ }
+ instance.run(context, options);
+ }
+
+ JobType jobType = etl.getOptions().getJobType();
+ switch (jobType) {
+ case IMPORT:
+ checkImportConfiguration(context);
+ break;
+ case EXPORT:
+ checkExportConfiguration(context);
+ break;
+ default:
+ throw new SqoopException(CoreError.CORE_0012, jobType.toString());
+ }
+ }
+
+ public void run() {
+ EtlOptions options = etl.getOptions();
+
+ try {
+ Job job = Job.getInstance(conf);
+
+ job.setInputFormatClass(SqoopInputFormat.class);
+ job.setMapperClass(SqoopMapper.class);
+ job.setMapOutputKeyClass(Data.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ if (options.getMaxLoaders() > 1) {
+ job.setReducerClass(SqoopReducer.class);
+ job.setNumReduceTasks(options.getMaxLoaders());
+ }
+ job.setOutputFormatClass((etl.isOutputDirectoryRequired()) ?
+ SqoopFileOutputFormat.class : SqoopNullOutputFormat.class);
+ job.setOutputKeyClass(Data.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ throw new SqoopException(CoreError.CORE_0008);
+ }
+
+ } catch (Exception e) {
+ throw new SqoopException(CoreError.CORE_0008, e);
+ }
+ }
+
+ public void destroy() {
+ Class<? extends Destroyer> destroyer = etl.getDestroyer();
+ if (destroyer != null) {
+ Destroyer instance;
+ try {
+ instance = (Destroyer) destroyer.newInstance();
+ } catch (Exception e) {
+ throw new SqoopException(CoreError.CORE_0010, destroyer.getName(), e);
+ }
+ instance.run(new EtlContext(conf));
+ }
+ }
+
+ private void checkImportConfiguration(EtlMutableContext context) {
+ if (etl.isFieldNamesRequired() &&
+ context.getString(JobConstants.JOB_ETL_FIELD_NAMES) == null) {
+ throw new SqoopException(CoreError.CORE_0020, "field names");
+ }
+
+ if (etl.isOutputDirectoryRequired()) {
+ String outputDirectory =
+ context.getString(JobConstants.JOB_ETL_OUTPUT_DIRECTORY);
+ if (outputDirectory == null) {
+ throw new SqoopException(CoreError.CORE_0020, "output directory");
+ } else {
+ context.setString(FileOutputFormat.OUTDIR, outputDirectory);
+ }
+ }
+ }
+
+ private void checkExportConfiguration(EtlMutableContext context) {
+ // TODO: check export related configuration
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
new file mode 100644
index 0000000..d236148
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * A reducer to perform reduce function.
+ */
+public class SqoopReducer
+ extends Reducer<Data, NullWritable, Data, NullWritable> {
+
+ public static final Log LOG =
+ LogFactory.getLog(SqoopReducer.class.getName());
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java b/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java
new file mode 100644
index 0000000..e653c22
--- /dev/null
+++ b/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+import junit.framework.TestCase;
+
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.JobEngine;
+import org.apache.sqoop.job.etl.Context;
+import org.apache.sqoop.job.etl.EtlOptions;
+import org.apache.sqoop.job.etl.Exporter;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.MutableContext;
+import org.apache.sqoop.job.etl.Options;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MJob.Type;
+import org.apache.sqoop.model.MJobForms;
+import org.apache.sqoop.validation.Validator;
+import org.junit.Test;
+
+public class TestJobEngine extends TestCase {
+
+ private static final String DATA_DIR = TestJobEngine.class.getSimpleName();
+ private static final String WAREHOUSE_ROOT = "/tmp/sqoop/warehouse/";
+
+ private static final String OUTPUT_DIR = WAREHOUSE_ROOT + DATA_DIR;
+ private static final String OUTPUT_FILE = "part-r-00000";
+ private static final int START_PARTITION = 1;
+ private static final int NUMBER_OF_PARTITIONS = 9;
+ private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
+
+ @Test
+ public void testImport() throws Exception {
+ FileUtils.delete(OUTPUT_DIR);
+
+ DummyConnector connector = new DummyConnector();
+ EtlOptions options = new EtlOptions(connector);
+
+ JobEngine engine = new JobEngine();
+ engine.run(options);
+
+ String fileName = OUTPUT_DIR + "/" + OUTPUT_FILE;
+ InputStream filestream = FileUtils.open(fileName);
+ BufferedReader filereader = new BufferedReader(new InputStreamReader(
+ filestream, Data.CHARSET_NAME));
+ verifyOutput(filereader);
+ }
+
+ private void verifyOutput(BufferedReader reader)
+ throws IOException {
+ String line = null;
+ int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+ Data expected = new Data();
+ while ((line = reader.readLine()) != null){
+ expected.setContent(new Object[] {
+ new Integer(index),
+ new Double(index),
+ String.valueOf(index) },
+ Data.ARRAY_RECORD);
+ index++;
+
+ assertEquals(expected.toString(), line);
+ }
+ reader.close();
+
+ assertEquals(NUMBER_OF_PARTITIONS*NUMBER_OF_ROWS_PER_PARTITION,
+ index-START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION);
+ }
+
+ public class DummyConnector implements SqoopConnector {
+
+ @Override
+ public Importer getImporter() {
+ return new Importer(
+ DummyImportInitializer.class,
+ DummyImportPartitioner.class,
+ DummyImportExtractor.class,
+ null);
+ }
+
+ @Override
+ public Exporter getExporter() {
+ fail("This method should not be invoked.");
+ return null;
+ }
+
+ @Override
+ public ResourceBundle getBundle(Locale locale) {
+ fail("This method should not be invoked.");
+ return null;
+ }
+
+ @Override
+ public Validator getValidator() {
+ fail("This method should not be invoked.");
+ return null;
+ }
+
+ @Override
+ public Class getConnectionConfigurationClass() {
+ fail("This method should not be invoked.");
+ return null;
+ }
+
+ @Override
+ public Class getJobConfigurationClass(Type jobType) {
+ fail("This method should not be invoked.");
+ return null;
+ }
+ }
+
+ public static class DummyImportInitializer extends Initializer {
+ @Override
+ public void run(MutableContext context, Options options) {
+ context.setString(Constants.JOB_ETL_OUTPUT_DIRECTORY, OUTPUT_DIR);
+ }
+ }
+
+ public static class DummyImportPartition extends Partition {
+ private int id;
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(id);
+ }
+ }
+
+ public static class DummyImportPartitioner extends Partitioner {
+ @Override
+ public List<Partition> run(Context context) {
+ List<Partition> partitions = new LinkedList<Partition>();
+ for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+ DummyImportPartition partition = new DummyImportPartition();
+ partition.setId(id);
+ partitions.add(partition);
+ }
+ return partitions;
+ }
+ }
+
+ public static class DummyImportExtractor extends Extractor {
+ @Override
+ public void run(Context context, Partition partition, DataWriter writer) {
+ int id = ((DummyImportPartition)partition).getId();
+ for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
+ writer.writeArrayRecord(new Object[] {
+ new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+ new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+ String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
+ }
+ }
+ }
+
+}