You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/11/06 21:53:35 UTC
[2/5] git commit: SQOOP-666 Introduce execution engine (Jarek Jarcec
Cecho)
SQOOP-666 Introduce execution engine
(Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/06e054bc
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/06e054bc
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/06e054bc
Branch: refs/heads/sqoop2
Commit: 06e054bc8d5350f1f15d88a0c6e9b6b236820327
Parents: 25f3fd3
Author: Bilung Lee <bl...@apache.org>
Authored: Tue Nov 6 12:43:03 2012 -0800
Committer: Bilung Lee <bl...@apache.org>
Committed: Tue Nov 6 12:43:03 2012 -0800
----------------------------------------------------------------------
core/pom.xml | 14 +-
.../apache/sqoop/framework/ExecutionEngine.java | 72 +++
.../apache/sqoop/framework/FrameworkConstants.java | 9 +
.../org/apache/sqoop/framework/FrameworkError.java | 4 +
.../apache/sqoop/framework/FrameworkManager.java | 136 ++++--
.../apache/sqoop/framework/SubmissionEngine.java | 9 +
.../apache/sqoop/framework/SubmissionRequest.java | 60 +---
.../java/org/apache/sqoop/job/JobConstants.java | 82 ----
.../java/org/apache/sqoop/job/PrefixContext.java | 62 ---
.../sqoop/job/etl/HdfsSequenceImportLoader.java | 108 ----
.../apache/sqoop/job/etl/HdfsTextImportLoader.java | 103 ----
.../main/java/org/apache/sqoop/job/io/Data.java | 378 ---------------
.../java/org/apache/sqoop/job/io/FieldTypes.java | 42 --
.../apache/sqoop/job/mr/ConfigurationUtils.java | 65 ---
.../apache/sqoop/job/mr/SqoopFileOutputFormat.java | 69 ---
.../org/apache/sqoop/job/mr/SqoopInputFormat.java | 118 -----
.../java/org/apache/sqoop/job/mr/SqoopMapper.java | 109 -----
.../apache/sqoop/job/mr/SqoopNullOutputFormat.java | 77 ---
.../job/mr/SqoopOutputFormatLoadExecutor.java | 228 ---------
.../java/org/apache/sqoop/job/mr/SqoopReducer.java | 35 --
.../java/org/apache/sqoop/job/mr/SqoopSplit.java | 82 ----
.../test/java/org/apache/sqoop/io/TestData.java | 76 ---
.../test/java/org/apache/sqoop/job/FileUtils.java | 69 ---
.../test/java/org/apache/sqoop/job/JobUtils.java | 69 ---
.../java/org/apache/sqoop/job/TestHdfsLoad.java | 232 ---------
.../java/org/apache/sqoop/job/TestJobEngine.java | 196 --------
.../java/org/apache/sqoop/job/TestMapReduce.java | 229 ---------
dist/src/main/server/conf/sqoop.properties | 5 +
execution/mapreduce/pom.xml | 67 +++
.../execution/mapreduce/MRSubmissionRequest.java | 110 +++++
.../mapreduce/MapreduceExecutionEngine.java | 74 +++
.../java/org/apache/sqoop/job/JobConstants.java | 82 ++++
.../java/org/apache/sqoop/job/PrefixContext.java | 62 +++
.../sqoop/job/etl/HdfsSequenceImportLoader.java | 108 ++++
.../apache/sqoop/job/etl/HdfsTextImportLoader.java | 103 ++++
.../main/java/org/apache/sqoop/job/io/Data.java | 378 +++++++++++++++
.../java/org/apache/sqoop/job/io/FieldTypes.java | 42 ++
.../apache/sqoop/job/mr/ConfigurationUtils.java | 65 +++
.../apache/sqoop/job/mr/SqoopFileOutputFormat.java | 69 +++
.../org/apache/sqoop/job/mr/SqoopInputFormat.java | 118 +++++
.../java/org/apache/sqoop/job/mr/SqoopMapper.java | 109 +++++
.../apache/sqoop/job/mr/SqoopNullOutputFormat.java | 77 +++
.../job/mr/SqoopOutputFormatLoadExecutor.java | 228 +++++++++
.../java/org/apache/sqoop/job/mr/SqoopReducer.java | 35 ++
.../java/org/apache/sqoop/job/mr/SqoopSplit.java | 82 ++++
.../test/java/org/apache/sqoop/job/FileUtils.java | 69 +++
.../test/java/org/apache/sqoop/job/JobUtils.java | 69 +++
.../java/org/apache/sqoop/job/TestHdfsLoad.java | 232 +++++++++
.../java/org/apache/sqoop/job/TestJobEngine.java | 196 ++++++++
.../java/org/apache/sqoop/job/TestMapReduce.java | 229 +++++++++
.../java/org/apache/sqoop/job/io/TestData.java | 75 +++
execution/pom.xml | 36 ++
pom.xml | 1 +
submission/mapreduce/pom.xml | 6 +
.../mapreduce/MapreduceSubmissionEngine.java | 20 +-
55 files changed, 2941 insertions(+), 2539 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 028c240..0732b2c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -36,29 +36,23 @@ limitations under the License.
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-spi</artifactId>
</dependency>
+
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common</artifactId>
</dependency>
+
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- <scope>provided</scope>
- </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
new file mode 100644
index 0000000..e1ccdf6
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.framework;
+
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.model.MSubmission;
+
+/**
+ * Execution engine drive execution of sqoop submission (job). It's responsible
+ * for executing all defined steps in the import/export workflow.
+ */
+public abstract class ExecutionEngine {
+
+ /**
+ * Initialize execution engine
+ *
+ * @param context Configuration context
+ */
+ public void initialize(ImmutableContext context, String prefix) {
+ }
+
+ /**
+ * Destroy execution engine when stopping server
+ */
+ public void destroy() {
+ }
+
+ /**
+ * Return new SubmissionRequest class or any subclass if it's needed by
+ * execution and submission engine combination.
+ *
+ * @param summary Submission summary
+ * @param connector Appropriate connector structure
+ * @param connectorConnection Connector connection configuration
+ * @param connectorJob Connector job configuration
+ * @param frameworkConnection Framework connection configuration
+ * @param frameworkJob Framework job configuration
+ * @return New Submission request object
+ */
+ public SubmissionRequest createSubmissionRequest(MSubmission summary,
+ SqoopConnector connector,
+ Object connectorConnection,
+ Object connectorJob,
+ Object frameworkConnection,
+ Object frameworkJob) {
+ return new SubmissionRequest(summary, connector,
+ connectorConnection, connectorJob, frameworkConnection, frameworkJob);
+ }
+
+ /**
+ * Prepare given submission request for import submission.
+ *
+ * @param request Submission request
+ */
+ public abstract void prepareImportSubmission(SubmissionRequest request);
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
index d6e70ca..32da4e8 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
@@ -29,6 +29,9 @@ public final class FrameworkConstants {
public static final String PREFIX_SUBMISSION_CONFIG =
ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "submission.";
+ public static final String PREFIX_EXECUTION_CONFIG =
+ ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "execution.";
+
public static final String SYSCFG_SUBMISSION_ENGINE =
PREFIX_SUBMISSION_CONFIG + "engine";
@@ -50,6 +53,12 @@ public final class FrameworkConstants {
public static final String SYSCFG_SUBMISSION_UPDATE_SLEEP =
PREFIX_SUBMISSION_UPDATE_CONFIG + "sleep";
+ public static final String SYSCFG_EXECUTION_ENGINE =
+ PREFIX_EXECUTION_CONFIG + "engine";
+
+ public static final String PREFIX_EXECUTION_ENGINE_CONFIG =
+ SYSCFG_EXECUTION_ENGINE + ".";
+
// Connection/Job Configuration forms
public static final String FORM_SECURITY =
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
index 19d0d87..4277311 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
@@ -38,6 +38,10 @@ public enum FrameworkError implements ErrorCode {
FRAMEWORK_0006("Can't bootstrap job"),
+ FRAMEWORK_0007("Invalid execution engine"),
+
+ FRAMEWORK_0008("Invalid combination of submission and execution engines"),
+
;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 604d403..7e10ddc 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -19,7 +19,6 @@ package org.apache.sqoop.framework;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector;
@@ -27,11 +26,8 @@ import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
-import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.etl.CallbackBase;
import org.apache.sqoop.job.etl.Destroyer;
-import org.apache.sqoop.job.etl.HdfsTextImportLoader;
-import org.apache.sqoop.job.etl.Importer;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MConnection;
@@ -57,47 +53,93 @@ import java.util.ResourceBundle;
/**
* Manager for Sqoop framework itself.
*
- * All Sqoop internals (job execution engine, metadata) should be handled
- * within this manager.
+ * All Sqoop internals are handled in this class:
+ * * Submission engine
+ * * Execution engine
+ * * Framework metadata
*
* Current implementation of entire submission engine is using repository
- * for keep of current track, so that server might be restarted at any time
- * without any affect on running jobs. This approach however might not be the
- * fastest way and we might want to introduce internal structures with running
- * jobs in case that this approach will be too slow.
+ * for keeping track of running submissions. Thus, server might be restarted at
+ * any time without any affect on running jobs. This approach however might not
+ * be the fastest way and we might want to introduce internal structures for
+ * running jobs in case that this approach will be too slow.
*/
public final class FrameworkManager {
private static final Logger LOG = Logger.getLogger(FrameworkManager.class);
+ /**
+ * Default interval for purging old submissions from repository.
+ */
private static final long DEFAULT_PURGE_THRESHOLD = 24*60*60*1000;
+ /**
+ * Default sleep interval for purge thread.
+ */
private static final long DEFAULT_PURGE_SLEEP = 24*60*60*1000;
+ /**
+ * Default interval for update thread.
+ */
private static final long DEFAULT_UPDATE_SLEEP = 60*5*1000;
+ /**
+ * Framework metadata structures in MForm format
+ */
private static MFramework mFramework;
+ /**
+ * Validator instance
+ */
private static final Validator validator;
+ /**
+ * Configured submission engine instance
+ */
private static SubmissionEngine submissionEngine;
+ /**
+ * Configured execution engine instance
+ */
+ private static ExecutionEngine executionEngine;
+
+ /**
+ * Purge thread that will periodically remove old submissions from repository.
+ */
private static PurgeThread purgeThread = null;
+ /**
+ * Update thread that will periodically check status of running submissions.
+ */
private static UpdateThread updateThread = null;
+ /**
+ * Synchronization variable between threads.
+ */
private static boolean running = true;
+ /**
+ * Specifies how old submissions should be removed from repository.
+ */
private static long purgeThreshold;
+ /**
+ * Number of milliseconds for purge thread to sleep.
+ */
private static long purgeSleep;
+ /**
+ * Number of milliseconds for update thread to slepp.
+ */
private static long updateSleep;
+ /**
+ * Mutex for creating new submissions. We're not allowing more then one
+ * running submission for one job.
+ */
private static final Object submissionMutex = new Object();
static {
-
MConnectionForms connectionForms = new MConnectionForms(
FormUtils.toForms(getConnectionConfigurationClass())
);
@@ -123,22 +165,31 @@ public final class FrameworkManager {
String submissionEngineClassName =
context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
- Class<?> submissionEngineClass =
- ClassUtils.loadClass(submissionEngineClassName);
-
- if (submissionEngineClass == null) {
+ submissionEngine = (SubmissionEngine) ClassUtils.instantiate(submissionEngineClassName);
+ if(submissionEngine == null) {
throw new SqoopException(FrameworkError.FRAMEWORK_0001,
- submissionEngineClassName);
+ submissionEngineClassName);
}
- try {
- submissionEngine = (SubmissionEngine)submissionEngineClass.newInstance();
- } catch (Exception ex) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0001,
- submissionEngineClassName, ex);
+ submissionEngine.initialize(context, FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
+
+ // Execution engine
+ String executionEngineClassName =
+ context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
+
+ executionEngine = (ExecutionEngine) ClassUtils.instantiate(executionEngineClassName);
+ if(executionEngine == null) {
+ throw new SqoopException(FrameworkError.FRAMEWORK_0007,
+ executionEngineClassName);
}
- submissionEngine.initialize(context, FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
+ // We need to make sure that user has configured compatible combination of
+ // submission engine and execution engine
+ if(! submissionEngine.isExecutionEngineSupported(executionEngine.getClass())) {
+ throw new SqoopException(FrameworkError.FRAMEWORK_0008);
+ }
+
+ executionEngine.initialize(context, FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
// Set up worker threads
purgeThreshold = context.getLong(
@@ -161,7 +212,6 @@ public final class FrameworkManager {
updateThread = new UpdateThread();
updateThread.start();
-
LOG.info("Submission manager initialized: OK");
}
@@ -189,6 +239,10 @@ public final class FrameworkManager {
if(submissionEngine != null) {
submissionEngine.destroy();
}
+
+ if(executionEngine != null) {
+ executionEngine.destroy();
+ }
}
public static Validator getValidator() {
@@ -253,22 +307,26 @@ public final class FrameworkManager {
// Create request object
MSubmission summary = new MSubmission(jobId);
- SubmissionRequest request = new SubmissionRequest(summary, connector,
- connectorConnection, connectorJob, frameworkConnection, frameworkJob);
+ SubmissionRequest request = executionEngine.createSubmissionRequest(
+ summary, connector,
+ connectorConnection, connectorJob,
+ frameworkConnection, frameworkJob);
request.setJobName(job.getName());
// Let's register all important jars
// sqoop-common
- request.addJar(ClassUtils.jarForClass(MapContext.class));
+ request.addJarForClass(MapContext.class);
// sqoop-core
- request.addJar(ClassUtils.jarForClass(FrameworkManager.class));
+ request.addJarForClass(FrameworkManager.class);
// sqoop-spi
- request.addJar(ClassUtils.jarForClass(SqoopConnector.class));
- // particular connector in use
- request.addJar(ClassUtils.jarForClass(connector.getClass()));
+ request.addJarForClass(SqoopConnector.class);
+ // Execution engine jar
+ request.addJarForClass(executionEngine.getClass());
+ // Connector in use
+ request.addJarForClass(connector.getClass());
// Extra libraries that Sqoop code requires
- request.addJar(ClassUtils.jarForClass(JSONValue.class));
+ request.addJarForClass(JSONValue.class);
switch (job.getType()) {
case IMPORT:
@@ -308,7 +366,7 @@ public final class FrameworkManager {
// Bootstrap job from framework perspective
switch (job.getType()) {
case IMPORT:
- bootstrapImportSubmission(request);
+ prepareImportSubmission(request);
break;
case EXPORT:
// TODO(jarcec): Implement export path
@@ -342,22 +400,14 @@ public final class FrameworkManager {
return summary;
}
- private static void bootstrapImportSubmission(SubmissionRequest request) {
- Importer importer = (Importer)request.getConnectorCallbacks();
+ private static void prepareImportSubmission(SubmissionRequest request) {
ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request.getConfigFrameworkJob();
// Initialize the map-reduce part (all sort of required classes, ...)
request.setOutputDirectory(jobConfiguration.outputDirectory);
- // Defaults for classes are mostly fine for now.
-
-
- // Set up framework context
- MutableMapContext context = request.getFrameworkContext();
- context.setString(JobConstants.JOB_ETL_PARTITIONER, importer.getPartitioner().getName());
- context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
- context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
- context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+ // Delegate rest of the job to execution engine
+ executionEngine.prepareImportSubmission(request);
}
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
index f4ad3f5..71e4ec9 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
@@ -42,6 +42,15 @@ public abstract class SubmissionEngine {
}
/**
+ * Callback to verify that configured submission engine and execution engine
+ * are compatible.
+ *
+ * @param executionEngineClass Configured execution class.
+ * @return True if such execution engine is supported
+ */
+ public abstract boolean isExecutionEngineSupported(Class executionEngineClass);
+
+ /**
* Submit new job to remote (hadoop) cluster. This method *must* fill
* submission.getSummary.setExternalId(), otherwise Sqoop framework won't
* be able to track progress on this job!
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index 27b0566..c70a5cc 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -17,15 +17,11 @@
*/
package org.apache.sqoop.framework;
-import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.etl.CallbackBase;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
-import org.apache.sqoop.job.mr.SqoopInputFormat;
-import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.utils.ClassUtils;
import java.util.LinkedList;
import java.util.List;
@@ -85,20 +81,6 @@ public class SubmissionRequest {
*/
String outputDirectory;
- /**
- * Map-reduce specific options.
- *
- * I'm using strings so that this class won't have direct dependency on
- * hadoop libraries.
- */
- Class inputFormatClass;
- Class mapperClass;
- Class mapOutputKeyClass;
- Class mapOutputValueClass;
- Class outputFormatClass;
- Class outputKeyClass;
- Class outputValueClass;
-
public SubmissionRequest(MSubmission submission,
SqoopConnector connector,
@@ -115,15 +97,6 @@ public class SubmissionRequest {
this.configConnectorJob = configConnectorJob;
this.configFrameworkConnection = configFrameworkConnection;
this.configFrameworkJob = configFrameworkJob;
-
- // TODO(Jarcec): Move this to job execution engine
- this.inputFormatClass = SqoopInputFormat.class;
- this.mapperClass = SqoopMapper.class;
- this.mapOutputKeyClass = Data.class;
- this.mapOutputValueClass = NullWritable.class;
- this.outputFormatClass = SqoopFileOutputFormat.class;
- this.outputKeyClass = Data.class;
- this.outputValueClass = NullWritable.class;
}
public MSubmission getSummary() {
@@ -150,6 +123,10 @@ public class SubmissionRequest {
jars.add(jar);
}
+ public void addJarForClass(Class klass) {
+ jars.add(ClassUtils.jarForClass(klass));
+ }
+
public void addJars(List<String> jars) {
this.jars.addAll(jars);
}
@@ -193,31 +170,4 @@ public class SubmissionRequest {
public void setOutputDirectory(String outputDirectory) {
this.outputDirectory = outputDirectory;
}
- public Class getInputFormatClass() {
- return inputFormatClass;
- }
-
- public Class getMapperClass() {
- return mapperClass;
- }
-
- public Class getMapOutputKeyClass() {
- return mapOutputKeyClass;
- }
-
- public Class getMapOutputValueClass() {
- return mapOutputValueClass;
- }
-
- public Class getOutputFormatClass() {
- return outputFormatClass;
- }
-
- public Class getOutputKeyClass() {
- return outputKeyClass;
- }
-
- public Class getOutputValueClass() {
- return outputValueClass;
- }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/JobConstants.java b/core/src/main/java/org/apache/sqoop/job/JobConstants.java
deleted file mode 100644
index 19ac91e..0000000
--- a/core/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import org.apache.sqoop.core.ConfigurationConstants;
-
-public final class JobConstants extends Constants {
- /**
- * All job related configuration is prefixed with this:
- * <tt>org.apache.sqoop.job.</tt>
- */
- public static final String PREFIX_JOB_CONFIG =
- ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
-
-
- public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
- + "etl.partitioner";
-
- public static final String JOB_ETL_EXTRACTOR = PREFIX_JOB_CONFIG
- + "etl.extractor";
-
- public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
- + "etl.loader";
-
- public static final String JOB_ETL_DESTROYER = PREFIX_JOB_CONFIG
- + "etl.destroyer";
-
-
- public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG
- + "mr.output.file";
-
- public static final String JOB_MR_OUTPUT_CODEC = PREFIX_JOB_CONFIG
- + "mr.output.codec";
-
-
- public static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION =
- PREFIX_JOB_CONFIG + "config.class.connector.connection";
-
- public static final String JOB_CONFIG_CLASS_CONNECTOR_JOB =
- PREFIX_JOB_CONFIG + "config.class.connector.job";
-
- public static final String JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION =
- PREFIX_JOB_CONFIG + "config.class.framework.connection";
-
- public static final String JOB_CONFIG_CLASS_FRAMEWORK_JOB =
- PREFIX_JOB_CONFIG + "config.class.framework.job";
-
- public static final String JOB_CONFIG_CONNECTOR_CONNECTION =
- PREFIX_JOB_CONFIG + "config.connector.connection";
-
- public static final String JOB_CONFIG_CONNECTOR_JOB =
- PREFIX_JOB_CONFIG + "config.connector.job";
-
- public static final String JOB_CONFIG_FRAMEWORK_CONNECTION =
- PREFIX_JOB_CONFIG + "config.framework.connection";
-
- public static final String JOB_CONFIG_FRAMEWORK_JOB =
- PREFIX_JOB_CONFIG + "config.framework.job";
-
- public static final String PREFIX_CONNECTOR_CONTEXT =
- PREFIX_JOB_CONFIG + "connector.context.";
-
-
- private JobConstants() {
- // Disable explicit object creation
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/PrefixContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/PrefixContext.java b/core/src/main/java/org/apache/sqoop/job/PrefixContext.java
deleted file mode 100644
index 5488b46..0000000
--- a/core/src/main/java/org/apache/sqoop/job/PrefixContext.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sqoop.common.ImmutableContext;
-
-/**
- * Implementation of immutable context that is based on Hadoop configuration
- * object. Each context property is prefixed with special prefix and loaded
- * directly.
- */
-public class PrefixContext implements ImmutableContext {
-
- Configuration configuration;
- String prefix;
-
- public PrefixContext(Configuration configuration, String prefix) {
- this.configuration = configuration;
- this.prefix = prefix;
- }
-
- @Override
- public String getString(String key) {
- return configuration.get(prefix + key);
- }
-
- @Override
- public String getString(String key, String defaultValue) {
- return configuration.get(prefix + key, defaultValue);
- }
-
- @Override
- public long getLong(String key, long defaultValue) {
- return configuration.getLong(prefix + key, defaultValue);
- }
-
- @Override
- public int getInt(String key, int defaultValue) {
- return configuration.getInt(prefix + key, defaultValue);
- }
-
- @Override
- public boolean getBoolean(String key, boolean defaultValue) {
- return configuration.getBoolean(prefix + key, defaultValue);
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
deleted file mode 100644
index 1235d1d..0000000
--- a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.sqoop.common.ImmutableContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.core.CoreError;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class HdfsSequenceImportLoader extends Loader {
-
- public static final String EXTENSION = ".seq";
-
- private final char fieldDelimiter;
-
- public HdfsSequenceImportLoader() {
- fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
- }
-
- @Override
- public void run(ImmutableContext context, DataReader reader) {
- reader.setFieldDelimiter(fieldDelimiter);
-
- Configuration conf = new Configuration();
-// Configuration conf = ((EtlContext)context).getConfiguration();
- String filename =
- context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
- String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
-
- CompressionCodec codec = null;
- if (codecname != null) {
- Class<?> clz = ClassUtils.loadClass(codecname);
- if (clz == null) {
- throw new SqoopException(CoreError.CORE_0009, codecname);
- }
-
- try {
- codec = (CompressionCodec) clz.newInstance();
- if (codec instanceof Configurable) {
- ((Configurable) codec).setConf(conf);
- }
- } catch (Exception e) {
- throw new SqoopException(CoreError.CORE_0010, codecname, e);
- }
- }
-
- filename += EXTENSION;
-
- try {
- Path filepath = new Path(filename);
- SequenceFile.Writer filewriter;
- if (codec != null) {
- filewriter = SequenceFile.createWriter(conf,
- SequenceFile.Writer.file(filepath),
- SequenceFile.Writer.keyClass(Text.class),
- SequenceFile.Writer.valueClass(NullWritable.class),
- SequenceFile.Writer.compression(CompressionType.BLOCK, codec));
- } else {
- filewriter = SequenceFile.createWriter(conf,
- SequenceFile.Writer.file(filepath),
- SequenceFile.Writer.keyClass(Text.class),
- SequenceFile.Writer.valueClass(NullWritable.class),
- SequenceFile.Writer.compression(CompressionType.NONE));
- }
-
- String csv;
- Text text = new Text();
- while ((csv = reader.readCsvRecord()) != null) {
- text.set(csv);
- filewriter.append(text, NullWritable.get());
- }
- filewriter.close();
-
- } catch (IOException e) {
- throw new SqoopException(CoreError.CORE_0018, e);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
deleted file mode 100644
index 36aa11f..0000000
--- a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.sqoop.common.ImmutableContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.core.CoreError;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class HdfsTextImportLoader extends Loader {
-
- private final char fieldDelimiter;
- private final char recordDelimiter;
-
- public HdfsTextImportLoader() {
- fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
- recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
- }
-
- @Override
- public void run(ImmutableContext context, DataReader reader) {
- reader.setFieldDelimiter(fieldDelimiter);
-
- Configuration conf = new Configuration();
-// Configuration conf = ((EtlContext)context).getConfiguration();
- String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
- String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
-
- CompressionCodec codec = null;
- if (codecname != null) {
- Class<?> clz = ClassUtils.loadClass(codecname);
- if (clz == null) {
- throw new SqoopException(CoreError.CORE_0009, codecname);
- }
-
- try {
- codec = (CompressionCodec) clz.newInstance();
- if (codec instanceof Configurable) {
- ((Configurable) codec).setConf(conf);
- }
- } catch (Exception e) {
- throw new SqoopException(CoreError.CORE_0010, codecname, e);
- }
-
- filename += codec.getDefaultExtension();
- }
-
- try {
- Path filepath = new Path(filename);
- FileSystem fs = filepath.getFileSystem(conf);
-
- BufferedWriter filewriter;
- DataOutputStream filestream = fs.create(filepath, false);
- if (codec != null) {
- filewriter = new BufferedWriter(new OutputStreamWriter(
- codec.createOutputStream(filestream, codec.createCompressor()),
- Data.CHARSET_NAME));
- } else {
- filewriter = new BufferedWriter(new OutputStreamWriter(
- filestream, Data.CHARSET_NAME));
- }
-
- String csv;
- while ((csv = reader.readCsvRecord()) != null) {
- filewriter.write(csv + recordDelimiter);
- }
- filewriter.close();
-
- } catch (IOException e) {
- throw new SqoopException(CoreError.CORE_0018, e);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/io/Data.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/io/Data.java b/core/src/main/java/org/apache/sqoop/job/io/Data.java
deleted file mode 100644
index 4ddd132..0000000
--- a/core/src/main/java/org/apache/sqoop/job/io/Data.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.regex.Matcher;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.core.CoreError;
-
-public class Data implements WritableComparable<Data> {
-
- // The content is an Object to accommodate different kinds of data.
- // For example, it can be:
- // - Object[] for an array of object record
- // - String for a text of CSV record
- private Object content = null;
-
- public static final int EMPTY_DATA = 0;
- public static final int CSV_RECORD = 1;
- public static final int ARRAY_RECORD = 2;
- private int type = EMPTY_DATA;
-
- public static final String CHARSET_NAME = "UTF-8";
-
- public static final char DEFAULT_RECORD_DELIMITER = '\n';
- public static final char DEFAULT_FIELD_DELIMITER = ',';
- public static final char DEFAULT_STRING_DELIMITER = '\'';
- public static final char DEFAULT_STRING_ESCAPE = '\\';
- private char fieldDelimiter = DEFAULT_FIELD_DELIMITER;
- private char stringDelimiter = DEFAULT_STRING_DELIMITER;
- private char stringEscape = DEFAULT_STRING_ESCAPE;
- private String escapedStringDelimiter = String.valueOf(new char[] {
- stringEscape, stringDelimiter
- });
-
- public void setFieldDelimiter(char fieldDelimiter) {
- this.fieldDelimiter = fieldDelimiter;
- }
-
- public void setContent(Object content, int type) {
- switch (type) {
- case EMPTY_DATA:
- case CSV_RECORD:
- case ARRAY_RECORD:
- this.type = type;
- this.content = content;
- break;
- default:
- throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
- }
- }
-
- public Object getContent(int targetType) {
- switch (targetType) {
- case CSV_RECORD:
- return format();
- case ARRAY_RECORD:
- return parse();
- default:
- throw new SqoopException(CoreError.CORE_0012, String.valueOf(targetType));
- }
- }
-
- public int getType() {
- return type;
- }
-
- public boolean isEmpty() {
- return (type == EMPTY_DATA);
- }
-
- @Override
- public String toString() {
- return (String)getContent(CSV_RECORD);
- }
-
- @Override
- public int compareTo(Data other) {
- byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME));
- byte[] otherBytes = other.toString().getBytes(
- Charset.forName(CHARSET_NAME));
- return WritableComparator.compareBytes(
- myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length);
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof Data)) {
- return false;
- }
-
- Data data = (Data)other;
- if (type != data.getType()) {
- return false;
- }
-
- return toString().equals(data.toString());
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- switch (type) {
- case CSV_RECORD:
- result += 31 * content.hashCode();
- return result;
- case ARRAY_RECORD:
- Object[] array = (Object[])content;
- for (int i = 0; i < array.length; i++) {
- result += 31 * array[i].hashCode();
- }
- return result;
- default:
- throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- type = readType(in);
- switch (type) {
- case CSV_RECORD:
- readCsv(in);
- break;
- case ARRAY_RECORD:
- readArray(in);
- break;
- default:
- throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- writeType(out, type);
- switch (type) {
- case CSV_RECORD:
- writeCsv(out);
- break;
- case ARRAY_RECORD:
- writeArray(out);
- break;
- default:
- throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
- }
- }
-
- private int readType(DataInput in) throws IOException {
- return WritableUtils.readVInt(in);
- }
-
- private void writeType(DataOutput out, int type) throws IOException {
- WritableUtils.writeVInt(out, type);
- }
-
- private void readCsv(DataInput in) throws IOException {
- content = in.readUTF();
- }
-
- private void writeCsv(DataOutput out) throws IOException {
- out.writeUTF((String)content);
- }
-
- private void readArray(DataInput in) throws IOException {
- // read number of columns
- int columns = in.readInt();
- content = new Object[columns];
- Object[] array = (Object[])content;
- // read each column
- for (int i = 0; i < array.length; i++) {
- int type = readType(in);
- switch (type) {
- case FieldTypes.UTF:
- array[i] = in.readUTF();
- break;
-
- case FieldTypes.BIN:
- int length = in.readInt();
- byte[] bytes = new byte[length];
- in.readFully(bytes);
- array[i] = bytes;
- break;
-
- case FieldTypes.DOUBLE:
- array[i] = in.readDouble();
- break;
-
- case FieldTypes.FLOAT:
- array[i] = in.readFloat();
- break;
-
- case FieldTypes.LONG:
- array[i] = in.readLong();
- break;
-
- case FieldTypes.INT:
- array[i] = in.readInt();
- break;
-
- case FieldTypes.SHORT:
- array[i] = in.readShort();
- break;
-
- case FieldTypes.CHAR:
- array[i] = in.readChar();
- break;
-
- case FieldTypes.BYTE:
- array[i] = in.readByte();
- break;
-
- case FieldTypes.BOOLEAN:
- array[i] = in.readBoolean();
- break;
-
- case FieldTypes.NULL:
- array[i] = null;
- break;
-
- default:
- throw new IOException(
- new SqoopException(CoreError.CORE_0012, Integer.toString(type))
- );
- }
- }
- }
-
- private void writeArray(DataOutput out) throws IOException {
- Object[] array = (Object[])content;
- // write number of columns
- out.writeInt(array.length);
- // write each column
- for (int i = 0; i < array.length; i++) {
- if (array[i] instanceof String) {
- writeType(out, FieldTypes.UTF);
- out.writeUTF((String)array[i]);
-
- } else if (array[i] instanceof byte[]) {
- writeType(out, FieldTypes.BIN);
- out.writeInt(((byte[])array[i]).length);
- out.write((byte[])array[i]);
-
- } else if (array[i] instanceof Double) {
- writeType(out, FieldTypes.DOUBLE);
- out.writeDouble((Double)array[i]);
-
- } else if (array[i] instanceof Float) {
- writeType(out, FieldTypes.FLOAT);
- out.writeFloat((Float)array[i]);
-
- } else if (array[i] instanceof Long) {
- writeType(out, FieldTypes.LONG);
- out.writeLong((Long)array[i]);
-
- } else if (array[i] instanceof Integer) {
- writeType(out, FieldTypes.INT);
- out.writeInt((Integer)array[i]);
-
- } else if (array[i] instanceof Short) {
- writeType(out, FieldTypes.SHORT);
- out.writeShort((Short)array[i]);
-
- } else if (array[i] instanceof Character) {
- writeType(out, FieldTypes.CHAR);
- out.writeChar((Character)array[i]);
-
- } else if (array[i] instanceof Byte) {
- writeType(out, FieldTypes.BYTE);
- out.writeByte((Byte)array[i]);
-
- } else if (array[i] instanceof Boolean) {
- writeType(out, FieldTypes.BOOLEAN);
- out.writeBoolean((Boolean)array[i]);
-
- } else if (array[i] == null) {
- writeType(out, FieldTypes.NULL);
-
- } else {
- throw new IOException(
- new SqoopException(
- CoreError.CORE_0012, array[i].getClass().getName()
- )
- );
- }
- }
- }
-
- private String format() {
- switch (type) {
- case EMPTY_DATA:
- return null;
-
- case CSV_RECORD:
- if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) {
- return (String)content;
- } else {
- // TODO: need to exclude the case where comma is part of a string.
- return ((String)content).replaceAll(
- String.valueOf(DEFAULT_FIELD_DELIMITER),
- String.valueOf(fieldDelimiter));
- }
-
- case ARRAY_RECORD:
- StringBuilder sb = new StringBuilder();
- Object[] array = (Object[])content;
- for (int i = 0; i < array.length; i++) {
- if (i != 0) {
- sb.append(fieldDelimiter);
- }
-
- if (array[i] instanceof String) {
- sb.append(stringDelimiter);
- sb.append(escape((String)array[i]));
- sb.append(stringDelimiter);
- } else if (array[i] instanceof byte[]) {
- sb.append(Arrays.toString((byte[])array[i]));
- } else {
- sb.append(String.valueOf(array[i]));
- }
- }
- return sb.toString();
-
- default:
- throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
- }
- }
-
- private Object[] parse() {
- switch (type) {
- case EMPTY_DATA:
- return null;
-
- case CSV_RECORD:
- ArrayList<Object> list = new ArrayList<Object>();
- // todo: need to parse CSV into Array
- return list.toArray();
-
- case ARRAY_RECORD:
- return (Object[])content;
-
- default:
- throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
- }
- }
-
- private String escape(String string) {
- // TODO: Also need to escape those special characters as documented in:
- // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
- String regex = String.valueOf(stringDelimiter);
- String replacement = Matcher.quoteReplacement(escapedStringDelimiter);
- return string.replaceAll(regex, replacement);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java b/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
deleted file mode 100644
index e96dc6e..0000000
--- a/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.io;
-
-public final class FieldTypes {
-
- public static final int NULL = 0;
-
- public static final int BOOLEAN = 1;
-
- public static final int BYTE = 10;
- public static final int CHAR = 11;
-
- public static final int SHORT = 20;
- public static final int INT = 21;
- public static final int LONG = 22;
-
- public static final int FLOAT = 50;
- public static final int DOUBLE = 51;
-
- public static final int BIN = 100;
- public static final int UTF = 101;
-
- private FieldTypes() {
- // Disable explicit object creation
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/core/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
deleted file mode 100644
index 59baaf6..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.model.FormUtils;
-import org.apache.sqoop.utils.ClassUtils;
-
-/**
- * Helper class to load configuration specific objects from job configuration
- */
-public final class ConfigurationUtils {
-
- public static Object getConnectorConnection(Configuration configuration) {
- return loadConfiguration(configuration,
- JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
- JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION);
- }
-
- public static Object getConnectorJob(Configuration configuration) {
- return loadConfiguration(configuration,
- JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
- JobConstants.JOB_CONFIG_CONNECTOR_JOB);
- }
-
- public static Object getFrameworkConnection(Configuration configuration) {
- return loadConfiguration(configuration,
- JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
- JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION);
- }
-
- public static Object getFrameworkJob(Configuration configuration) {
- return loadConfiguration(configuration,
- JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
- JobConstants.JOB_CONFIG_FRAMEWORK_JOB);
- }
-
- private static Object loadConfiguration(Configuration configuration,
- String classProperty,
- String valueProperty) {
- Object object = ClassUtils.instantiate(configuration.get(classProperty));
- FormUtils.fillValues(configuration.get(valueProperty), object);
- return object;
- }
-
- private ConfigurationUtils() {
- // Instantiation is prohibited
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
deleted file mode 100644
index c465f10..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.job.mr;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.io.Data;
-
-/**
- * An output format for MapReduce job.
- */
-public class SqoopFileOutputFormat
- extends FileOutputFormat<Data, NullWritable> {
-
- public static final Log LOG =
- LogFactory.getLog(SqoopFileOutputFormat.class.getName());
-
- public static final Class<? extends CompressionCodec> DEFAULT_CODEC =
- DefaultCodec.class;
-
- @Override
- public RecordWriter<Data, NullWritable> getRecordWriter(
- TaskAttemptContext context) throws IOException {
- Configuration conf = context.getConfiguration();
-
- Path filepath = getDefaultWorkFile(context, "");
- String filename = filepath.toString();
- conf.set(JobConstants.JOB_MR_OUTPUT_FILE, filename);
-
- boolean isCompressed = getCompressOutput(context);
- if (isCompressed) {
- String codecname =
- conf.get(FileOutputFormat.COMPRESS_CODEC, DEFAULT_CODEC.getName());
- conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname);
- }
-
- SqoopOutputFormatLoadExecutor executor =
- new SqoopOutputFormatLoadExecutor(context);
- return executor.getRecordWriter();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
deleted file mode 100644
index 8fcdc99..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.PrefixContext;
-import org.apache.sqoop.job.etl.Partition;
-import org.apache.sqoop.job.etl.Partitioner;
-import org.apache.sqoop.utils.ClassUtils;
-
-/**
- * An InputFormat for MapReduce job.
- */
-public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
-
- public static final Log LOG =
- LogFactory.getLog(SqoopInputFormat.class.getName());
-
- @Override
- public RecordReader<SqoopSplit, NullWritable> createRecordReader(
- InputSplit split, TaskAttemptContext context) {
- return new SqoopRecordReader();
- }
-
- @Override
- public List<InputSplit> getSplits(JobContext context)
- throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
-
- String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER);
- Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
-
- PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
- Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf);
- Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
-
- List<Partition> partitions = partitioner.getPartitions(connectorContext, connectorConnection, connectorJob);
- List<InputSplit> splits = new LinkedList<InputSplit>();
- for (Partition partition : partitions) {
- LOG.debug("Partition: " + partition);
- SqoopSplit split = new SqoopSplit();
- split.setPartition(partition);
- splits.add(split);
- }
-
- return splits;
- }
-
- public static class SqoopRecordReader
- extends RecordReader<SqoopSplit, NullWritable> {
-
- private boolean delivered = false;
- private SqoopSplit split = null;
-
- @Override
- public boolean nextKeyValue() {
- if (delivered) {
- return false;
- } else {
- delivered = true;
- return true;
- }
- }
-
- @Override
- public SqoopSplit getCurrentKey() {
- return split;
- }
-
- @Override
- public NullWritable getCurrentValue() {
- return NullWritable.get();
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public float getProgress() {
- return delivered ? 1.0f : 0.0f;
- }
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) {
- this.split = (SqoopSplit)split;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
deleted file mode 100644
index 6892b4b..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.core.CoreError;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.PrefixContext;
-import org.apache.sqoop.job.etl.Extractor;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataWriter;
-import org.apache.sqoop.utils.ClassUtils;
-
-/**
- * A mapper to perform map function.
- */
-public class SqoopMapper
- extends Mapper<SqoopSplit, NullWritable, Data, NullWritable> {
-
- public static final Log LOG =
- LogFactory.getLog(SqoopMapper.class.getName());
-
- @Override
- public void run(Context context) throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
-
- String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
- Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
-
- PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
- Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf);
- Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
-
- SqoopSplit split = context.getCurrentKey();
-
- try {
- extractor.run(connectorContext, connectorConnection, connectorJob, split.getPartition(),
- new MapDataWriter(context));
-
- } catch (Exception e) {
- throw new SqoopException(CoreError.CORE_0017, e);
- }
- }
-
- public class MapDataWriter extends DataWriter {
- private Context context;
- private Data data;
-
- public MapDataWriter(Context context) {
- this.context = context;
- }
-
- @Override
- public void setFieldDelimiter(char fieldDelimiter) {
- if (data == null) {
- data = new Data();
- }
-
- data.setFieldDelimiter(fieldDelimiter);
- }
-
- @Override
- public void writeArrayRecord(Object[] array) {
- writeContent(array, Data.ARRAY_RECORD);
- }
-
- @Override
- public void writeCsvRecord(String csv) {
- writeContent(csv, Data.CSV_RECORD);
- }
-
- @Override
- public void writeContent(Object content, int type) {
- if (data == null) {
- data = new Data();
- }
-
- data.setContent(content, type);
- try {
- context.write(data, NullWritable.get());
- } catch (Exception e) {
- throw new SqoopException(CoreError.CORE_0013, e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
deleted file mode 100644
index 1242f90..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.job.mr;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.sqoop.job.io.Data;
-
-/**
- * An output format for MapReduce job.
- */
-public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> {
-
- public static final Log LOG =
- LogFactory.getLog(SqoopNullOutputFormat.class.getName());
-
- @Override
- public void checkOutputSpecs(JobContext context) {
- // do nothing
- }
-
- @Override
- public RecordWriter<Data, NullWritable> getRecordWriter(
- TaskAttemptContext context) {
- SqoopOutputFormatLoadExecutor executor =
- new SqoopOutputFormatLoadExecutor(context);
- return executor.getRecordWriter();
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
- // return an output committer that does nothing
- return new NullOutputCommitter();
- }
-
- class NullOutputCommitter extends OutputCommitter {
- @Override
- public void setupJob(JobContext jobContext) { }
-
- @Override
- public void setupTask(TaskAttemptContext taskContext) { }
-
- @Override
- public void commitTask(TaskAttemptContext taskContext) { }
-
- @Override
- public void abortTask(TaskAttemptContext taskContext) { }
-
- @Override
- public boolean needsTaskCommit(TaskAttemptContext taskContext) {
- return false;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
deleted file mode 100644
index 96e1533..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.job.mr;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.core.CoreError;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.PrefixContext;
-import org.apache.sqoop.job.etl.Loader;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class SqoopOutputFormatLoadExecutor {
-
- public static final Log LOG =
- LogFactory.getLog(SqoopOutputFormatLoadExecutor.class.getName());
-
- private boolean readerFinished;
- private boolean writerFinished;
- private Data data;
- private JobContext context;
- private SqoopRecordWriter producer;
- private ConsumerThread consumer;
-
- public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
- data = new Data();
- context = jobctx;
- producer = new SqoopRecordWriter();
- consumer = new ConsumerThread();
- }
-
- public RecordWriter<Data, NullWritable> getRecordWriter() {
- consumer.setDaemon(true);
- consumer.start();
- return producer;
- }
-
- public class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
- @Override
- public void write(Data key, NullWritable value) {
- synchronized (data) {
- if (readerFinished) {
- consumer.checkException();
- return;
- }
-
- try {
- if (!data.isEmpty()) {
- // wait for reader to consume data
- data.wait();
- }
-
- int type = key.getType();
- data.setContent(key.getContent(type), type);
-
- // notify reader that the data is ready
- data.notify();
-
- } catch (InterruptedException e) {
- // inform reader that writer is finished
- writerFinished = true;
-
- // unlock reader so it can continue
- data.notify();
-
- // throw exception
- throw new SqoopException(CoreError.CORE_0015, e);
- }
- }
- }
-
- @Override
- public void close(TaskAttemptContext context) {
- synchronized (data) {
- if (readerFinished) {
- consumer.checkException();
- return;
- }
-
- try {
- if (!data.isEmpty()) {
- // wait for reader to consume data
- data.wait();
- }
-
- writerFinished = true;
-
- data.notify();
-
- } catch (InterruptedException e) {
- // inform reader that writer is finished
- writerFinished = true;
-
- // unlock reader so it can continue
- data.notify();
-
- // throw exception
- throw new SqoopException(CoreError.CORE_0015, e);
- }
- }
- }
- }
-
- public class OutputFormatDataReader extends DataReader {
- @Override
- public void setFieldDelimiter(char fieldDelimiter) {
- data.setFieldDelimiter(fieldDelimiter);
- }
-
- @Override
- public Object[] readArrayRecord() {
- return (Object[])readContent(Data.ARRAY_RECORD);
- }
-
- @Override
- public String readCsvRecord() {
- return (String)readContent(Data.CSV_RECORD);
- }
-
- @Override
- public Object readContent(int type) {
- synchronized (data) {
- if (writerFinished) {
- return null;
- }
-
- try {
- if (data.isEmpty()) {
- // wait for writer to produce data
- data.wait();
- }
-
- Object content = data.getContent(type);
- data.setContent(null, Data.EMPTY_DATA);
-
- // notify writer that data is consumed
- data.notify();
-
- return content;
-
- } catch (InterruptedException e) {
- // inform writer that reader is finished
- readerFinished = true;
-
- // unlock writer so it can continue
- data.notify();
-
- // throw exception
- throw new SqoopException(CoreError.CORE_0016, e);
- }
- }
- }
- }
-
- public class ConsumerThread extends Thread {
- private SqoopException exception = null;
-
- public void checkException() {
- if (exception != null) {
- throw exception;
- }
- }
-
- @Override
- public void run() {
- DataReader reader = new OutputFormatDataReader();
-
- Configuration conf = context.getConfiguration();
-
-
- String loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
- Loader loader = (Loader) ClassUtils.instantiate(loaderName);
-
- // Get together framework context as configuration prefix by nothing
- PrefixContext frameworkContext = new PrefixContext(conf, "");
-
- try {
- loader.run(frameworkContext, reader);
- } catch (Throwable t) {
- throw new SqoopException(CoreError.CORE_0018, t);
- }
-
- synchronized (data) {
- // inform writer that reader is finished
- readerFinished = true;
-
- // unlock writer so it can continue
- data.notify();
-
- // if no exception happens yet
- if (exception == null && !writerFinished) {
- // create exception if data are not all consumed
- exception = new SqoopException(CoreError.CORE_0019);
- }
-
- // throw deferred exception if exist
- if (exception != null) {
- throw exception;
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
deleted file mode 100644
index d236148..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.sqoop.job.io.Data;
-
-/**
- * A reducer to perform reduce function.
- */
-public class SqoopReducer
- extends Reducer<Data, NullWritable, Data, NullWritable> {
-
- public static final Log LOG =
- LogFactory.getLog(SqoopReducer.class.getName());
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
deleted file mode 100644
index 7dc9541..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.core.CoreError;
-import org.apache.sqoop.job.etl.Partition;
-import org.apache.sqoop.utils.ClassUtils;
-
-/**
- * An input split to be read.
- */
-public class SqoopSplit extends InputSplit implements Writable {
-
- private Partition partition;
-
- public void setPartition(Partition partition) {
- this.partition = partition;
- }
-
- public Partition getPartition() {
- return partition;
- }
-
- @Override
- public long getLength() throws IOException {
- return 0;
- }
-
- @Override
- public String[] getLocations() throws IOException {
- return new String[] {};
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- // read Partition class name
- String className = in.readUTF();
- // instantiate Partition object
- Class<?> clz = ClassUtils.loadClass(className);
- if (clz == null) {
- throw new SqoopException(CoreError.CORE_0009, className);
- }
- try {
- partition = (Partition) clz.newInstance();
- } catch (Exception e) {
- throw new SqoopException(CoreError.CORE_0010, className, e);
- }
- // read Partition object content
- partition.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- // write Partition class name
- out.writeUTF(partition.getClass().getName());
- // write Partition object content
- partition.write(out);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/test/java/org/apache/sqoop/io/TestData.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/io/TestData.java b/core/src/test/java/org/apache/sqoop/io/TestData.java
deleted file mode 100644
index 9fe9d41..0000000
--- a/core/src/test/java/org/apache/sqoop/io/TestData.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.io;
-
-import java.util.Arrays;
-
-import junit.framework.TestCase;
-
-import org.apache.sqoop.job.io.Data;
-import org.junit.Test;
-
-public class TestData extends TestCase {
-
- private static final double TEST_NUMBER = Math.PI + 100;
- @Test
- public void testArrayToCsv() throws Exception {
- Data data = new Data();
- String expected;
- String actual;
-
- // with special characters:
- expected =
- Long.valueOf((long)TEST_NUMBER) + "," +
- Double.valueOf(TEST_NUMBER) + "," +
- "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
- Arrays.toString(new byte[] {1, 2, 3, 4, 5});
- data.setContent(new Object[] {
- Long.valueOf((long)TEST_NUMBER),
- Double.valueOf(TEST_NUMBER),
- String.valueOf(TEST_NUMBER) + "',s",
- new byte[] {1, 2, 3, 4, 5} },
- Data.ARRAY_RECORD);
- actual = (String)data.getContent(Data.CSV_RECORD);
- assertEquals(expected, actual);
-
- // with null characters:
- expected =
- Long.valueOf((long)TEST_NUMBER) + "," +
- Double.valueOf(TEST_NUMBER) + "," +
- "null" + "," +
- Arrays.toString(new byte[] {1, 2, 3, 4, 5});
- data.setContent(new Object[] {
- Long.valueOf((long)TEST_NUMBER),
- Double.valueOf(TEST_NUMBER),
- null,
- new byte[] {1, 2, 3, 4, 5} },
- Data.ARRAY_RECORD);
- actual = (String)data.getContent(Data.CSV_RECORD);
- assertEquals(expected, actual);
- }
-
- public static void assertEquals(Object expected, Object actual) {
- if (expected instanceof byte[]) {
- assertEquals(Arrays.toString((byte[])expected),
- Arrays.toString((byte[])actual));
- } else {
- TestCase.assertEquals(expected, actual);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/test/java/org/apache/sqoop/job/FileUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/FileUtils.java b/core/src/test/java/org/apache/sqoop/job/FileUtils.java
deleted file mode 100644
index e685883..0000000
--- a/core/src/test/java/org/apache/sqoop/job/FileUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class FileUtils {
-
- public static boolean exists(String file) throws IOException {
- Path path = new Path(file);
- FileSystem fs = path.getFileSystem(new Configuration());
- return fs.exists(path);
- }
-
- public static void delete(String file) throws IOException {
- Path path = new Path(file);
- FileSystem fs = path.getFileSystem(new Configuration());
- if (fs.exists(path)) {
- fs.delete(path, true);
- }
- }
-
- public static void mkdirs(String directory) throws IOException {
- Path path = new Path(directory);
- FileSystem fs = path.getFileSystem(new Configuration());
- if (!fs.exists(path)) {
- fs.mkdirs(path);
- }
- }
-
- public static InputStream open(String fileName)
- throws IOException, ClassNotFoundException {
- Path filepath = new Path(fileName);
- FileSystem fs = filepath.getFileSystem(new Configuration());
- return fs.open(filepath);
- }
-
- public static OutputStream create(String fileName) throws IOException {
- Path filepath = new Path(fileName);
- FileSystem fs = filepath.getFileSystem(new Configuration());
- return fs.create(filepath, false);
- }
-
- private FileUtils() {
- // Disable explicit object creation
- }
-
-}