You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/08/12 00:01:16 UTC
[13/19] SQOOP-1376: Sqoop2: From/To: Refactor connector interface
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java b/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java
index 475f41c..908b44d 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java
@@ -35,6 +35,8 @@ public class Constants {
// Options
public static final String OPT_XID = "xid";
+ public static final String OPT_FXID = "fxid";
+ public static final String OPT_TXID = "txid";
public static final String OPT_ALL = "all";
public static final String OPT_JID = "jid";
public static final String OPT_CID = "cid";
@@ -54,6 +56,8 @@ public class Constants {
public static final String OPT_DETAIL = "detail";
public static final char OPT_XID_CHAR = 'x';
+ public static final char OPT_FXID_CHAR = 'f';
+ public static final char OPT_TXID_CHAR = 't';
public static final char OPT_ALL_CHAR = 'a';
public static final char OPT_JID_CHAR = 'j';
public static final char OPT_CID_CHAR = 'c';
@@ -143,12 +147,14 @@ public class Constants {
"args.function.unknown";
public static final String RES_ARGS_XID_MISSING =
"args.xid_missing";
+ public static final String RES_ARGS_FXID_MISSING =
+ "args.fxid_missing";
+ public static final String RES_ARGS_TXID_MISSING =
+ "args.txid_missing";
public static final String RES_ARGS_JID_MISSING =
"args.jid_missing";
public static final String RES_ARGS_CID_MISSING =
"args.cid_missing";
- public static final String RES_ARGS_TYPE_MISSING =
- "args.type_missing";
public static final String RES_ARGS_NAME_MISSING =
"args.name_missing";
public static final String RES_ARGS_VALUE_MISSING =
@@ -160,8 +166,6 @@ public class Constants {
"prompt.job_id";
public static final String RES_CONNECTOR_ID =
"prompt.connector_id";
- public static final String RES_PROMPT_JOB_TYPE =
- "prompt.job_type";
public static final String RES_PROMPT_UPDATE_CONN_METADATA =
"prompt.update_conn_metadata";
public static final String RES_PROMPT_UPDATE_JOB_METADATA =
@@ -375,10 +379,12 @@ public class Constants {
"table.header.version";
public static final String RES_TABLE_HEADER_CLASS =
"table.header.class";
- public static final String RES_TABLE_HEADER_TYPE =
- "table.header.type";
public static final String RES_TABLE_HEADER_CONNECTOR =
"table.header.connector";
+ public static final String RES_TABLE_HEADER_FROM_CONNECTOR =
+ "table.header.connector.from";
+ public static final String RES_TABLE_HEADER_TO_CONNECTOR =
+ "table.header.connector.to";
public static final String RES_TABLE_HEADER_JOB_ID =
"table.header.jid";
public static final String RES_TABLE_HEADER_EXTERNAL_ID =
@@ -390,14 +396,10 @@ public class Constants {
public static final String RES_TABLE_HEADER_ENABLED =
"table.header.enabled";
- public static final String RES_FORMDISPLAYER_SUPPORTED_JOBTYPE =
- "formdisplayer.supported_job_types";
public static final String RES_FORMDISPLAYER_CONNECTION =
"formdisplayer.connection";
public static final String RES_FORMDISPLAYER_JOB =
"formdisplayer.job";
- public static final String RES_FORMDISPLAYER_FORM_JOBTYPE =
- "formdisplayer.forms_jobtype";
public static final String RES_FORMDISPLAYER_FORM =
"formdisplayer.form";
public static final String RES_FORMDISPLAYER_NAME =
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java b/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java
index 56e0b4e..44196e6 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java
@@ -18,9 +18,11 @@
package org.apache.sqoop.shell.utils;
import org.apache.commons.lang.StringUtils;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.model.MAccountableEntity;
import org.apache.sqoop.model.MBooleanInput;
import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MEnumInput;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MFramework;
@@ -28,7 +30,6 @@ import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MInputType;
import org.apache.sqoop.model.MIntegerInput;
import org.apache.sqoop.model.MJob;
-import org.apache.sqoop.model.MJobForms;
import org.apache.sqoop.model.MMapInput;
import org.apache.sqoop.model.MStringInput;
import org.apache.sqoop.shell.core.Constants;
@@ -49,21 +50,34 @@ public final class FormDisplayer {
public static void displayFormMetadataDetails(MFramework framework,
ResourceBundle bundle) {
- print(" %s: ", resourceString(Constants.RES_FORMDISPLAYER_SUPPORTED_JOBTYPE));
- println(framework.getAllJobsForms().keySet().toString());
-
displayFormsMetadata(
framework.getConnectionForms().getForms(),
resourceString(Constants.RES_FORMDISPLAYER_CONNECTION),
bundle);
- for (MJobForms jobForms : framework.getAllJobsForms().values()) {
- print(" %s ", resourceString(Constants.RES_FORMDISPLAYER_FORM_JOBTYPE));
- print(jobForms.getType().name());
- println(":");
+ displayFormsMetadata(
+ framework.getJobForms().getForms(),
+ resourceString(Constants.RES_FORMDISPLAYER_JOB),
+ bundle);
+ }
- displayFormsMetadata(jobForms.getForms(), resourceString(Constants.RES_FORMDISPLAYER_JOB), bundle);
- }
+ public static void displayFormMetadataDetails(MConnector connector,
+ ResourceBundle bundle) {
+ displayFormsMetadata(
+ connector.getConnectionForms().getForms(),
+ resourceString(Constants.RES_FORMDISPLAYER_CONNECTION),
+ bundle);
+
+ // @TODO(Abe): Validate From/To output is correct.
+ displayFormsMetadata(
+ connector.getJobForms(ConnectorType.FROM).getForms(),
+ resourceString(Constants.RES_FORMDISPLAYER_JOB),
+ bundle);
+
+ displayFormsMetadata(
+ connector.getJobForms(ConnectorType.TO).getForms(),
+ resourceString(Constants.RES_FORMDISPLAYER_JOB),
+ bundle);
}
public static void displayFormsMetadata(List<MForm> forms,
@@ -139,8 +153,9 @@ public final class FormDisplayer {
formList.addAll(connection.getFrameworkPart().getForms());
} else if(entity instanceof MJob) {
MJob job = (MJob) entity;
- formList.addAll(job.getConnectorPart().getForms());
+ formList.addAll(job.getConnectorPart(ConnectorType.FROM).getForms());
formList.addAll(job.getFrameworkPart().getForms());
+ formList.addAll(job.getConnectorPart(ConnectorType.TO).getForms());
}
for(MForm form : formList) {
if(form.getValidationStatus() == Status.ACCEPTABLE) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java b/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java
index c491ae5..cc75d94 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java
@@ -21,6 +21,7 @@ import jline.ConsoleReader;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang.StringUtils;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.model.MBooleanInput;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MEnumInput;
@@ -55,7 +56,7 @@ public final class FormFiller {
/**
* Fill job object based on CLI options.
*
- * @param reader Associated console reader object
+ * @param line Associated console reader object
* @param job Job that user is suppose to fill in
* @return True if we filled all inputs, false if user has stopped processing
* @throws IOException
@@ -68,7 +69,7 @@ public final class FormFiller {
// Fill in data from user
return fillForms(line,
- job.getConnectorPart().getForms(),
+ job.getConnectorPart(ConnectorType.FROM).getForms(),
job.getFrameworkPart().getForms());
}
@@ -77,25 +78,28 @@ public final class FormFiller {
*
* @param reader Associated console reader object
* @param job Job that user is suppose to fill in
- * @param connectorBundle Connector resource bundle
+ * @param fromConnectorBundle Connector resource bundle
* @param frameworkBundle Framework resource bundle
* @return True if we filled all inputs, false if user has stopped processing
* @throws IOException
*/
public static boolean fillJob(ConsoleReader reader,
MJob job,
- ResourceBundle connectorBundle,
- ResourceBundle frameworkBundle)
+ ResourceBundle fromConnectorBundle,
+ ResourceBundle frameworkBundle,
+ ResourceBundle toConnectorBundle)
throws IOException {
job.setName(getName(reader, job.getName()));
// Fill in data from user
return fillForms(reader,
- job.getConnectorPart().getForms(),
- connectorBundle,
+ job.getConnectorPart(ConnectorType.FROM).getForms(),
+ fromConnectorBundle,
job.getFrameworkPart().getForms(),
- frameworkBundle);
+ frameworkBundle,
+ job.getConnectorPart(ConnectorType.TO).getForms(),
+ toConnectorBundle);
}
/**
@@ -387,8 +391,7 @@ public final class FormFiller {
List<MForm> connectorForms,
ResourceBundle connectorBundle,
List<MForm> frameworkForms,
- ResourceBundle frameworkBundle
- ) throws IOException {
+ ResourceBundle frameworkBundle) throws IOException {
// Query connector forms
@@ -400,6 +403,32 @@ public final class FormFiller {
if(!fillForms(frameworkForms, reader, frameworkBundle)) {
return false;
}
+ return true;
+ }
+
+ public static boolean fillForms(ConsoleReader reader,
+ List<MForm> fromConnectorForms,
+ ResourceBundle fromConnectorBundle,
+ List<MForm> frameworkForms,
+ ResourceBundle frameworkBundle,
+ List<MForm> toConnectorForms,
+ ResourceBundle toConnectorBundle) throws IOException {
+
+
+ // From connector forms
+ if(!fillForms(fromConnectorForms, reader, fromConnectorBundle)) {
+ return false;
+ }
+
+ // Query framework forms
+ if(!fillForms(frameworkForms, reader, frameworkBundle)) {
+ return false;
+ }
+
+ // To connector forms
+ if(!fillForms(toConnectorForms, reader, toConnectorBundle)) {
+ return false;
+ }
return true;
}
@@ -880,7 +909,7 @@ public final class FormFiller {
}
public static void printJobValidationMessages(MJob job) {
- for (MForm form : job.getConnectorPart().getForms()) {
+ for (MForm form : job.getConnectorPart(ConnectorType.FROM).getForms()) {
for (MInput<?> input : form.getInputs()) {
printValidationMessage(input, true);
}
@@ -890,6 +919,11 @@ public final class FormFiller {
printValidationMessage(input, true);
}
}
+ for (MForm form : job.getConnectorPart(ConnectorType.TO).getForms()) {
+ for (MInput<?> input : form.getInputs()) {
+ printValidationMessage(input, true);
+ }
+ }
}
private FormFiller() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java b/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java
index aa118e1..40a4e33 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.shell.utils;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.model.MJob;
/**
@@ -34,11 +35,14 @@ public class JobDynamicFormOptions extends DynamicFormOptions<MJob> {
.withLongOpt("name")
.hasArg()
.create());
- for (Option option : FormOptions.getFormsOptions("connector", job.getConnectorPart().getForms())) {
+ for (Option option : FormOptions.getFormsOptions("connector", job.getConnectorPart(ConnectorType.FROM).getForms())) {
this.addOption(option);
}
for (Option option : FormOptions.getFormsOptions("framework", job.getFrameworkPart().getForms())) {
this.addOption(option);
}
+ for (Option option : FormOptions.getFormsOptions("connector", job.getConnectorPart(ConnectorType.TO).getForms())) {
+ this.addOption(option);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/shell/src/main/resources/shell-resource.properties
----------------------------------------------------------------------
diff --git a/shell/src/main/resources/shell-resource.properties b/shell/src/main/resources/shell-resource.properties
index df9457d..d4c782e 100644
--- a/shell/src/main/resources/shell-resource.properties
+++ b/shell/src/main/resources/shell-resource.properties
@@ -30,9 +30,10 @@ object-name.help = Non unique name of the entity to help you remember \
#
args.function.unknown = The specified function "{0}" is not recognized.
args.xid_missing = Required argument --xid is missing.
+args.fxid_missing = Required argument --fxid is missing.
+args.txid_missing = Required argument --txid is missing.
args.jid_missing = Required argument --jid is missing.
args.cid_missing = Required argument --cid is missing.
-args.type_missing = Required argument --type is missing.
args.name_missing = Required argument --name is missing.
args.value_missing = Required argument --value is missing.
@@ -79,7 +80,7 @@ create.job_successful = New job was successfully created with validation \
status {0} and persistent id {1}
## Creating messages
create.creating_conn = Creating connection for connector with id {0}
-create.creating_job = Creating job for connection with id {0}
+create.creating_job = Creating job for connections with id {0} and {1}
#
# Delete command
@@ -193,8 +194,9 @@ table.header.id = Id
table.header.name = Name
table.header.version = Version
table.header.class = Class
-table.header.type = Type
table.header.connector = Connector
+table.header.connector.from = From Connector
+table.header.connector.to = To Connector
table.header.jid = Job Id
table.header.eid = External Id
table.header.status = Status
@@ -205,7 +207,6 @@ table.header.enabled = Enabled
formdisplayer.supported_job_types = Supported job types
formdisplayer.connection = Connection
formdisplayer.job = Job
-formdisplayer.forms_jobtype = Forms for job type
formdisplayer.form = form
formdisplayer.name = Name
formdisplayer.label = Label
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
index 50eb940..7081b4c 100644
--- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
+++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
@@ -20,11 +20,11 @@ package org.apache.sqoop.connector.spi;
import java.util.Locale;
import java.util.ResourceBundle;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
-import org.apache.sqoop.job.etl.Exporter;
-import org.apache.sqoop.job.etl.Importer;
-import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
import org.apache.sqoop.validation.Validator;
/**
@@ -53,17 +53,17 @@ public abstract class SqoopConnector {
/**
* @return Get job configuration class for given type or null if not supported
*/
- public abstract Class getJobConfigurationClass(MJob.Type jobType);
+ public abstract Class getJobConfigurationClass(ConnectorType jobType);
/**
- * @return an <tt>Importer</tt> that provides classes for performing import.
+ * @return an <tt>From</tt> that provides classes for performing import.
*/
- public abstract Importer getImporter();
+ public abstract From getFrom();
/**
- * @return an <tt>Exporter</tt> that provides classes for performing export.
+ * @return an <tt>To</tt> that provides classes for performing export.
*/
- public abstract Exporter getExporter();
+ public abstract To getTo();
/**
* Returns validation object that Sqoop framework can use to validate user
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java b/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
deleted file mode 100644
index cdaa623..0000000
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-/**
- * This specifies classes that perform connector-defined steps
- * within export execution:
- * Initializer
- * -> (framework-defined steps)
- * -> Loader
- * -> Destroyer
- */
-public class Exporter extends CallbackBase {
-
- private Class<? extends Loader> loader;
-
- public Exporter(
- Class<? extends Initializer> initializer,
- Class<? extends Loader> loader,
- Class<? extends Destroyer> destroyer
- ) {
- super(initializer, destroyer);
- this.loader = loader;
- }
-
- public Class<? extends Loader> getLoader() {
- return loader;
- }
-
- @Override
- public String toString() {
- return "Exporter{" + super.toString() +
- ", loader=" + loader +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/spi/src/main/java/org/apache/sqoop/job/etl/From.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/From.java b/spi/src/main/java/org/apache/sqoop/job/etl/From.java
new file mode 100644
index 0000000..9b8d76f
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/From.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+/**
+ * This specifies classes that perform connector-defined steps
+ * within import execution:
+ * Initializer
+ * -> Partitioner
+ * -> Extractor
+ * -> (framework-defined steps)
+ * -> Destroyer
+ */
+public class From extends CallbackBase {
+
+ private Class<? extends Partitioner> partitioner;
+ private Class<? extends Extractor> extractor;
+
+ public From(Class<? extends Initializer> initializer,
+ Class<? extends Partitioner> partitioner,
+ Class<? extends Extractor> extractor,
+ Class<? extends Destroyer> destroyer) {
+ super(initializer, destroyer);
+ this.partitioner = partitioner;
+ this.extractor = extractor;
+ }
+
+ public Class<? extends Partitioner> getPartitioner() {
+ return partitioner;
+ }
+
+ public Class<? extends Extractor> getExtractor() {
+ return extractor;
+ }
+
+ @Override
+ public String toString() {
+ return "Importer{" + super.toString() +
+ ", partitioner=" + partitioner.getName() +
+ ", extractor=" + extractor.getName() +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
deleted file mode 100644
index d4c9e70..0000000
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-/**
- * This specifies classes that perform connector-defined steps
- * within import execution:
- * Initializer
- * -> Partitioner
- * -> Extractor
- * -> (framework-defined steps)
- * -> Destroyer
- */
-public class Importer extends CallbackBase {
-
- private Class<? extends Partitioner> partitioner;
- private Class<? extends Extractor> extractor;
-
- public Importer(Class<? extends Initializer> initializer,
- Class<? extends Partitioner> partitioner,
- Class<? extends Extractor> extractor,
- Class<? extends Destroyer> destroyer) {
- super(initializer, destroyer);
- this.partitioner = partitioner;
- this.extractor = extractor;
- }
-
- public Class<? extends Partitioner> getPartitioner() {
- return partitioner;
- }
-
- public Class<? extends Extractor> getExtractor() {
- return extractor;
- }
-
- @Override
- public String toString() {
- return "Importer{" + super.toString() +
- ", partitioner=" + partitioner.getName() +
- ", extractor=" + extractor.getName() +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/spi/src/main/java/org/apache/sqoop/job/etl/To.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/To.java b/spi/src/main/java/org/apache/sqoop/job/etl/To.java
new file mode 100644
index 0000000..a791945
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/To.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+/**
+ * This specifies classes that perform connector-defined steps
+ * within export execution:
+ * Initializer
+ * -> (framework-defined steps)
+ * -> Loader
+ * -> Destroyer
+ */
+public class To extends CallbackBase {
+
+ private Class<? extends Loader> loader;
+
+ public To(
+ Class<? extends Initializer> initializer,
+ Class<? extends Loader> loader,
+ Class<? extends Destroyer> destroyer
+ ) {
+ super(initializer, destroyer);
+ this.loader = loader;
+ }
+
+ public Class<? extends Loader> getLoader() {
+ return loader;
+ }
+
+ @Override
+ public String toString() {
+ return "Exporter{" + super.toString() +
+ ", loader=" + loader +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/spi/src/main/java/org/apache/sqoop/validation/Validator.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/validation/Validator.java b/spi/src/main/java/org/apache/sqoop/validation/Validator.java
index cf0b4aa..9b791f8 100644
--- a/spi/src/main/java/org/apache/sqoop/validation/Validator.java
+++ b/spi/src/main/java/org/apache/sqoop/validation/Validator.java
@@ -40,11 +40,10 @@ public class Validator {
/**
* Validate configuration object for job .
*
- * @param type Type of jobs that being validated
* @param jobConfiguration Job to be validated
* @return Validation status
*/
- public Validation validateJob(MJob.Type type, Object jobConfiguration) {
+ public Validation validateJob(Object jobConfiguration) {
return new Validation(EmptyClass.class);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index a05274a..3c21421 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -19,15 +19,14 @@ package org.apache.sqoop.submission.mapreduce;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.execution.mapreduce.MRSubmissionRequest;
@@ -155,9 +154,6 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
// Clone global configuration
Configuration configuration = new Configuration(globalConfiguration);
- // Serialize job type as it will be needed by underlying execution engine
- ConfigurationUtils.setJobType(configuration, request.getJobType());
-
// Serialize framework context into job configuration
for(Map.Entry<String, String> entry: request.getFrameworkContext()) {
if (entry.getValue() == null) {
@@ -168,16 +164,26 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
}
// Serialize connector context as a sub namespace
- for(Map.Entry<String, String> entry :request.getConnectorContext()) {
+ for(Map.Entry<String, String> entry : request.getConnectorContext(ConnectorType.FROM)) {
if (entry.getValue() == null) {
LOG.warn("Ignoring null connector context value for key " + entry.getKey());
continue;
}
configuration.set(
- JobConstants.PREFIX_CONNECTOR_CONTEXT + entry.getKey(),
+ JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT + entry.getKey(),
entry.getValue());
}
+ for(Map.Entry<String, String> entry : request.getConnectorContext(ConnectorType.TO)) {
+ if (entry.getValue() == null) {
+ LOG.warn("Ignoring null connector context value for key " + entry.getKey());
+ continue;
+ }
+ configuration.set(
+ JobConstants.PREFIX_CONNECTOR_TO_CONTEXT + entry.getKey(),
+ entry.getValue());
+ }
+
// Set up notification URL if it's available
if(request.getNotificationUrl() != null) {
configuration.set("job.end.notification.url", request.getNotificationUrl());
@@ -194,9 +200,12 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
Job job = new Job(configuration);
// And finally put all configuration objects to credentials cache
- ConfigurationUtils.setConfigConnectorConnection(job, request.getConfigConnectorConnection());
- ConfigurationUtils.setConfigConnectorJob(job, request.getConfigConnectorJob());
- ConfigurationUtils.setConfigFrameworkConnection(job, request.getConfigFrameworkConnection());
+ ConfigurationUtils.setConnectorConnectionConfig(ConnectorType.FROM, job, request.getConnectorConnectionConfig(ConnectorType.FROM));
+ ConfigurationUtils.setConnectorJobConfig(ConnectorType.FROM, job, request.getConnectorJobConfig(ConnectorType.FROM));
+ ConfigurationUtils.setConnectorConnectionConfig(ConnectorType.TO, job, request.getConnectorConnectionConfig(ConnectorType.TO));
+ ConfigurationUtils.setConnectorJobConfig(ConnectorType.TO, job, request.getConnectorJobConfig(ConnectorType.TO));
+ ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.FROM, job, request.getFrameworkConnectionConfig(ConnectorType.FROM));
+ ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.TO, job, request.getFrameworkConnectionConfig(ConnectorType.TO));
ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob());
ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema());
@@ -212,11 +221,6 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
job.setMapOutputKeyClass(request.getMapOutputKeyClass());
job.setMapOutputValueClass(request.getMapOutputValueClass());
- String outputDirectory = request.getOutputDirectory();
- if(outputDirectory != null) {
- FileOutputFormat.setOutputPath(job, new Path(outputDirectory));
- }
-
// Set number of reducers as number of configured loaders or suppress
// reduce phase entirely if loaders are not set at all.
if(request.getLoaders() != null) {