You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/08/08 22:12:20 UTC
[2/5] SQOOP-1376: Sqoop2: From/To: Refactor connector interface
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
index 1978ec6..e5bb564 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
@@ -42,161 +42,161 @@ import org.apache.sqoop.job.io.Data;
* Extract from HDFS.
* Default field delimiter of a record is comma.
*/
-public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
-
- public static final Logger LOG = Logger.getLogger(HdfsExportExtractor.class);
-
- private Configuration conf;
- private DataWriter dataWriter;
- private long rowRead = 0;
-
- private final char fieldDelimiter;
-
- public HdfsExportExtractor() {
- fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
- }
-
- @Override
- public void extract(ExtractorContext context,
- ConnectionConfiguration connectionConfiguration,
- ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
-
- conf = ((PrefixContext) context.getContext()).getConfiguration();
- dataWriter = context.getDataWriter();
- dataWriter.setFieldDelimiter(fieldDelimiter);
-
- try {
- HdfsExportPartition p = partition;
- LOG.info("Working on partition: " + p);
- int numFiles = p.getNumberOfFiles();
- for (int i = 0; i < numFiles; i++) {
- extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
- }
- } catch (IOException e) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
- }
- }
-
- private void extractFile(Path file, long start, long length)
- throws IOException {
- long end = start + length;
- LOG.info("Extracting file " + file);
- LOG.info("\t from offset " + start);
- LOG.info("\t to offset " + end);
- LOG.info("\t of length " + length);
- if(isSequenceFile(file)) {
- extractSequenceFile(file, start, length);
- } else {
- extractTextFile(file, start, length);
- }
- }
-
- /**
- * Extracts Sequence file
- * @param file
- * @param start
- * @param length
- * @throws IOException
- */
- private void extractSequenceFile(Path file, long start, long length)
- throws IOException {
- LOG.info("Extracting sequence file");
- long end = start + length;
- SequenceFile.Reader filereader = new SequenceFile.Reader(
- file.getFileSystem(conf), file, conf);
-
- if (start > filereader.getPosition()) {
- filereader.sync(start); // sync to start
- }
-
- Text line = new Text();
- boolean hasNext = filereader.next(line);
- while (hasNext) {
- rowRead++;
- dataWriter.writeCsvRecord(line.toString());
- line = new Text();
- hasNext = filereader.next(line);
- if (filereader.getPosition() >= end && filereader.syncSeen()) {
- break;
- }
- }
- filereader.close();
- }
-
- /**
- * Extracts Text file
- * @param file
- * @param start
- * @param length
- * @throws IOException
- */
- private void extractTextFile(Path file, long start, long length)
- throws IOException {
- LOG.info("Extracting text file");
- long end = start + length;
- FileSystem fs = file.getFileSystem(conf);
- FSDataInputStream filestream = fs.open(file);
- CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
- LineReader filereader;
- Seekable fileseeker = filestream;
-
- // Hadoop 1.0 does not have support for custom record delimiter and thus
- // we
- // are supporting only default one.
- // We might add another "else if" case for SplittableCompressionCodec once
- // we drop support for Hadoop 1.0.
- if (codec == null) {
- filestream.seek(start);
- filereader = new LineReader(filestream);
- } else {
- filereader = new LineReader(codec.createInputStream(filestream,
- codec.createDecompressor()), conf);
- fileseeker = filestream;
- }
- if (start != 0) {
- // always throw away first record because
- // one extra line is read in previous split
- start += filereader.readLine(new Text(), 0);
- }
- int size;
- LOG.info("Start position: " + String.valueOf(start));
- long next = start;
- while (next <= end) {
- Text line = new Text();
- size = filereader.readLine(line, Integer.MAX_VALUE);
- if (size == 0) {
- break;
- }
- if (codec == null) {
- next += size;
- } else {
- next = fileseeker.getPos();
- }
- rowRead++;
- dataWriter.writeCsvRecord(line.toString());
- }
- LOG.info("Extracting ended on position: " + fileseeker.getPos());
- filestream.close();
- }
-
- @Override
- public long getRowsRead() {
- return rowRead;
- }
-
- /**
- * Returns true if given file is sequence
- * @param file
- * @return boolean
- */
- private boolean isSequenceFile(Path file) {
- SequenceFile.Reader filereader = null;
- try {
- filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
- filereader.close();
- } catch (IOException e) {
- return false;
- }
- return true;
- }
-}
+//public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
+//
+// public static final Logger LOG = Logger.getLogger(HdfsExportExtractor.class);
+//
+// private Configuration conf;
+// private DataWriter dataWriter;
+// private long rowRead = 0;
+//
+// private final char fieldDelimiter;
+//
+// public HdfsExportExtractor() {
+// fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
+// }
+//
+// @Override
+// public void extract(ExtractorContext context,
+// ConnectionConfiguration connectionConfiguration,
+// ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
+//
+// conf = ((PrefixContext) context.getContext()).getConfiguration();
+// dataWriter = context.getDataWriter();
+// dataWriter.setFieldDelimiter(fieldDelimiter);
+//
+// try {
+// HdfsExportPartition p = partition;
+// LOG.info("Working on partition: " + p);
+// int numFiles = p.getNumberOfFiles();
+// for (int i = 0; i < numFiles; i++) {
+// extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
+// }
+// } catch (IOException e) {
+// throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
+// }
+// }
+//
+// private void extractFile(Path file, long start, long length)
+// throws IOException {
+// long end = start + length;
+// LOG.info("Extracting file " + file);
+// LOG.info("\t from offset " + start);
+// LOG.info("\t to offset " + end);
+// LOG.info("\t of length " + length);
+// if(isSequenceFile(file)) {
+// extractSequenceFile(file, start, length);
+// } else {
+// extractTextFile(file, start, length);
+// }
+// }
+//
+// /**
+// * Extracts Sequence file
+// * @param file
+// * @param start
+// * @param length
+// * @throws IOException
+// */
+// private void extractSequenceFile(Path file, long start, long length)
+// throws IOException {
+// LOG.info("Extracting sequence file");
+// long end = start + length;
+// SequenceFile.Reader filereader = new SequenceFile.Reader(
+// file.getFileSystem(conf), file, conf);
+//
+// if (start > filereader.getPosition()) {
+// filereader.sync(start); // sync to start
+// }
+//
+// Text line = new Text();
+// boolean hasNext = filereader.next(line);
+// while (hasNext) {
+// rowRead++;
+// dataWriter.writeCsvRecord(line.toString());
+// line = new Text();
+// hasNext = filereader.next(line);
+// if (filereader.getPosition() >= end && filereader.syncSeen()) {
+// break;
+// }
+// }
+// filereader.close();
+// }
+//
+// /**
+// * Extracts Text file
+// * @param file
+// * @param start
+// * @param length
+// * @throws IOException
+// */
+// private void extractTextFile(Path file, long start, long length)
+// throws IOException {
+// LOG.info("Extracting text file");
+// long end = start + length;
+// FileSystem fs = file.getFileSystem(conf);
+// FSDataInputStream filestream = fs.open(file);
+// CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
+// LineReader filereader;
+// Seekable fileseeker = filestream;
+//
+// // Hadoop 1.0 does not have support for custom record delimiter and thus
+// // we
+// // are supporting only default one.
+// // We might add another "else if" case for SplittableCompressionCodec once
+// // we drop support for Hadoop 1.0.
+// if (codec == null) {
+// filestream.seek(start);
+// filereader = new LineReader(filestream);
+// } else {
+// filereader = new LineReader(codec.createInputStream(filestream,
+// codec.createDecompressor()), conf);
+// fileseeker = filestream;
+// }
+// if (start != 0) {
+// // always throw away first record because
+// // one extra line is read in previous split
+// start += filereader.readLine(new Text(), 0);
+// }
+// int size;
+// LOG.info("Start position: " + String.valueOf(start));
+// long next = start;
+// while (next <= end) {
+// Text line = new Text();
+// size = filereader.readLine(line, Integer.MAX_VALUE);
+// if (size == 0) {
+// break;
+// }
+// if (codec == null) {
+// next += size;
+// } else {
+// next = fileseeker.getPos();
+// }
+// rowRead++;
+// dataWriter.writeCsvRecord(line.toString());
+// }
+// LOG.info("Extracting ended on position: " + fileseeker.getPos());
+// filestream.close();
+// }
+//
+// @Override
+// public long getRowsRead() {
+// return rowRead;
+// }
+//
+// /**
+// * Returns true if given file is sequence
+// * @param file
+// * @return boolean
+// */
+// private boolean isSequenceFile(Path file) {
+// SequenceFile.Reader filereader = null;
+// try {
+// filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
+// filereader.close();
+// } catch (IOException e) {
+// return false;
+// }
+// return true;
+// }
+//}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
index bd11323..c60ae68 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -22,10 +22,10 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.json.util.SchemaSerialization;
import org.apache.sqoop.model.FormUtils;
-import org.apache.sqoop.model.MJob;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.utils.ClassUtils;
import org.json.simple.JSONObject;
@@ -40,59 +40,59 @@ import java.util.Properties;
*/
public final class ConfigurationUtils {
- private static final String JOB_TYPE = JobConstants.PREFIX_JOB_CONFIG + "type";
+ private static final String JOB_CONFIG_CLASS_FROM_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.connection";
- private static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.connection";
+ private static final String JOB_CONFIG_CLASS_TO_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.connection";
- private static final String JOB_CONFIG_CLASS_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.job";
+ private static final String JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job";
- private static final String JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.connection";
+ private static final String JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job";
+
+ private static final String JOB_CONFIG_CLASS_FROM_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.from.connection";
+
+ private static final String JOB_CONFIG_CLASS_TO_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.to.connection";
private static final String JOB_CONFIG_CLASS_FRAMEWORK_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.job";
- private static final String JOB_CONFIG_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.connector.connection";
+ private static final String JOB_CONFIG_FROM_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.connection";
+
+ private static final Text JOB_CONFIG_FROM_CONNECTOR_CONNECTION_KEY = new Text(JOB_CONFIG_FROM_CONNECTOR_CONNECTION);
+
+ private static final String JOB_CONFIG_TO_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.connection";
+
+ private static final Text JOB_CONFIG_TO_CONNECTOR_CONNECTION_KEY = new Text(JOB_CONFIG_TO_CONNECTOR_CONNECTION);
+
+ private static final String JOB_CONFIG_FROM_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job";
+
+ private static final Text JOB_CONFIG_FROM_CONNECTOR_JOB_KEY = new Text(JOB_CONFIG_FROM_CONNECTOR_JOB);
- private static final Text JOB_CONFIG_CONNECTOR_CONNECTION_KEY = new Text(JOB_CONFIG_CONNECTOR_CONNECTION);
+ private static final String JOB_CONFIG_TO_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job";
- private static final String JOB_CONFIG_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.connector.job";
+ private static final Text JOB_CONFIG_TO_CONNECTOR_JOB_KEY = new Text(JOB_CONFIG_TO_CONNECTOR_JOB);
- private static final Text JOB_CONFIG_CONNECTOR_JOB_KEY = new Text(JOB_CONFIG_CONNECTOR_JOB);
+ private static final String JOB_CONFIG_FROM_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.framework.from.connection";
- private static final String JOB_CONFIG_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.framework.connection";
+ private static final Text JOB_CONFIG_FROM_FRAMEWORK_CONNECTION_KEY = new Text(JOB_CONFIG_FROM_FRAMEWORK_CONNECTION);
- private static final Text JOB_CONFIG_FRAMEWORK_CONNECTION_KEY = new Text(JOB_CONFIG_FRAMEWORK_CONNECTION);
+ private static final String JOB_CONFIG_TO_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.framework.from.connection";
+
+ private static final Text JOB_CONFIG_TO_FRAMEWORK_CONNECTION_KEY = new Text(JOB_CONFIG_TO_FRAMEWORK_CONNECTION);
private static final String JOB_CONFIG_FRAMEWORK_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.framework.job";
private static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY = new Text(JOB_CONFIG_FRAMEWORK_JOB);
- private static final String SCHEMA_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector";
+ private static final String SCHEMA_FROM_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from";
- private static final Text SCHEMA_CONNECTOR_KEY = new Text(SCHEMA_CONNECTOR);
+ private static final Text SCHEMA_FROM_CONNECTOR_KEY = new Text(SCHEMA_FROM_CONNECTOR);
- private static final String SCHEMA_HIO = JobConstants.PREFIX_JOB_CONFIG + "schema.hio";
+ private static final String SCHEMA_TO_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to";
- private static final Text SCHEMA_HIO_KEY = new Text(SCHEMA_HIO);
+ private static final Text SCHEMA_TO_CONNECTOR_KEY = new Text(SCHEMA_TO_CONNECTOR);
- /**
- * Persist job type in the configuration object.
- *
- * @param configuration MapReduce configuration object
- * @param type Job type
- */
- public static void setJobType(Configuration configuration, MJob.Type type) {
- configuration.set(JOB_TYPE, type.name());
- }
+ private static final String SCHEMA_HIO = JobConstants.PREFIX_JOB_CONFIG + "schema.hio";
- /**
- * Retrieve job type.
- *
- * @param configuration MapReduce configuration object
- * @return Job type
- */
- public static MJob.Type getJobType(Configuration configuration) {
- return MJob.Type.valueOf(configuration.get(JOB_TYPE));
- }
+ private static final Text SCHEMA_HIO_KEY = new Text(SCHEMA_HIO);
/**
* Persist Connector configuration object for connection.
@@ -100,20 +100,38 @@ public final class ConfigurationUtils {
* @param job MapReduce job object
* @param obj Configuration object
*/
- public static void setConfigConnectorConnection(Job job, Object obj) {
- job.getConfiguration().set(JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, obj.getClass().getName());
- job.getCredentials().addSecretKey(JOB_CONFIG_CONNECTOR_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+ public static void setConnectorConnectionConfig(ConnectorType type, Job job, Object obj) {
+ switch (type) {
+ case FROM:
+ job.getConfiguration().set(JOB_CONFIG_CLASS_FROM_CONNECTOR_CONNECTION, obj.getClass().getName());
+ job.getCredentials().addSecretKey(JOB_CONFIG_FROM_CONNECTOR_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+ break;
+
+ case TO:
+ job.getConfiguration().set(JOB_CONFIG_CLASS_TO_CONNECTOR_CONNECTION, obj.getClass().getName());
+ job.getCredentials().addSecretKey(JOB_CONFIG_TO_CONNECTOR_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+ break;
+ }
}
/**
- * Persist Connector configuration object for job.
+ * Persist Connector configuration objects for job.
*
* @param job MapReduce job object
* @param obj Configuration object
*/
- public static void setConfigConnectorJob(Job job, Object obj) {
- job.getConfiguration().set(JOB_CONFIG_CLASS_CONNECTOR_JOB, obj.getClass().getName());
- job.getCredentials().addSecretKey(JOB_CONFIG_CONNECTOR_JOB_KEY, FormUtils.toJson(obj).getBytes());
+ public static void setConnectorJobConfig(ConnectorType type, Job job, Object obj) {
+ switch (type) {
+ case FROM:
+ job.getConfiguration().set(JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, obj.getClass().getName());
+ job.getCredentials().addSecretKey(JOB_CONFIG_FROM_CONNECTOR_JOB_KEY, FormUtils.toJson(obj).getBytes());
+ break;
+
+ case TO:
+ job.getConfiguration().set(JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, obj.getClass().getName());
+ job.getCredentials().addSecretKey(JOB_CONFIG_TO_CONNECTOR_JOB_KEY, FormUtils.toJson(obj).getBytes());
+ break;
+ }
}
/**
@@ -122,9 +140,18 @@ public final class ConfigurationUtils {
* @param job MapReduce job object
* @param obj Configuration object
*/
- public static void setConfigFrameworkConnection(Job job, Object obj) {
- job.getConfiguration().set(JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION, obj.getClass().getName());
- job.getCredentials().addSecretKey(JOB_CONFIG_FRAMEWORK_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+ public static void setFrameworkConnectionConfig(ConnectorType type, Job job, Object obj) {
+ switch (type) {
+ case FROM:
+ job.getConfiguration().set(JOB_CONFIG_CLASS_FROM_FRAMEWORK_CONNECTION, obj.getClass().getName());
+ job.getCredentials().addSecretKey(JOB_CONFIG_FROM_FRAMEWORK_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+ break;
+
+ case TO:
+ job.getConfiguration().set(JOB_CONFIG_CLASS_TO_FRAMEWORK_CONNECTION, obj.getClass().getName());
+ job.getCredentials().addSecretKey(JOB_CONFIG_TO_FRAMEWORK_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+ break;
+ }
}
/**
@@ -144,8 +171,16 @@ public final class ConfigurationUtils {
* @param configuration MapReduce configuration object
* @return Configuration object
*/
- public static Object getConfigConnectorConnection(Configuration configuration) {
- return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, JOB_CONFIG_CONNECTOR_CONNECTION_KEY);
+ public static Object getConnectorConnectionConfig(ConnectorType type, Configuration configuration) {
+ switch (type) {
+ case FROM:
+ return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FROM_CONNECTOR_CONNECTION, JOB_CONFIG_FROM_CONNECTOR_CONNECTION_KEY);
+
+ case TO:
+ return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_TO_CONNECTOR_CONNECTION, JOB_CONFIG_TO_CONNECTOR_CONNECTION_KEY);
+ }
+
+ return null;
}
/**
@@ -154,8 +189,16 @@ public final class ConfigurationUtils {
* @param configuration MapReduce configuration object
* @return Configuration object
*/
- public static Object getConfigConnectorJob(Configuration configuration) {
- return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_CONNECTOR_JOB, JOB_CONFIG_CONNECTOR_JOB_KEY);
+ public static Object getConnectorJobConfig(ConnectorType type, Configuration configuration) {
+ switch (type) {
+ case FROM:
+ return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, JOB_CONFIG_FROM_CONNECTOR_JOB_KEY);
+
+ case TO:
+ return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, JOB_CONFIG_TO_CONNECTOR_JOB_KEY);
+ }
+
+ return null;
}
/**
@@ -164,8 +207,16 @@ public final class ConfigurationUtils {
* @param configuration MapReduce configuration object
* @return Configuration object
*/
- public static Object getConfigFrameworkConnection(Configuration configuration) {
- return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION, JOB_CONFIG_FRAMEWORK_CONNECTION_KEY);
+ public static Object getFrameworkConnectionConfig(ConnectorType type, Configuration configuration) {
+ switch (type) {
+ case FROM:
+ return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FROM_FRAMEWORK_CONNECTION, JOB_CONFIG_FROM_FRAMEWORK_CONNECTION_KEY);
+
+ case TO:
+ return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_TO_FRAMEWORK_CONNECTION, JOB_CONFIG_TO_FRAMEWORK_CONNECTION_KEY);
+ }
+
+ return null;
}
/**
@@ -179,47 +230,57 @@ public final class ConfigurationUtils {
}
/**
- * Persist Connector generated schema.
+ * Persist From Connector generated schema.
*
* @param job MapReduce Job object
* @param schema Schema
*/
- public static void setConnectorSchema(Job job, Schema schema) {
+ public static void setFromConnectorSchema(Job job, Schema schema) {
if(schema != null) {
- job.getCredentials().addSecretKey(SCHEMA_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+ job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
}
}
/**
- * Persist Framework generated schema.
+ * Persist To Connector generated schema.
*
* @param job MapReduce Job object
* @param schema Schema
*/
- public static void setHioSchema(Job job, Schema schema) {
+ public static void setToConnectorSchema(Job job, Schema schema) {
if(schema != null) {
- job.getCredentials().addSecretKey(SCHEMA_HIO_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+ job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
}
}
/**
- * Retrieve Connector generated schema.
+ * Persist Framework generated schema.
*
- * @param configuration MapReduce configuration object
- * @return Schema
+ * @param job MapReduce Job object
+ * @param schema Schema
*/
- public static Schema getConnectorSchema(Configuration configuration) {
- return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_CONNECTOR_KEY));
+ public static void setHioSchema(Job job, Schema schema) {
+ if(schema != null) {
+ job.getCredentials().addSecretKey(SCHEMA_HIO_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+ }
}
/**
- * Retrieve Framework generated schema.
+ * Retrieve From Connector generated schema.
*
* @param configuration MapReduce configuration object
* @return Schema
*/
- public static Schema getHioSchema(Configuration configuration) {
- return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_HIO_KEY));
+ public static Schema getConnectorSchema(ConnectorType type, Configuration configuration) {
+ switch (type) {
+ case FROM:
+ return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_CONNECTOR_KEY));
+
+ case TO:
+ return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_CONNECTOR_KEY));
+ }
+
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index e1a95a7..b4e9c2b 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -19,10 +19,12 @@ package org.apache.sqoop.job.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
+import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.utils.ClassUtils;
@@ -51,18 +53,18 @@ public class SqoopDestroyerExecutor {
}
// Objects that should be pass to the Destroyer execution
- PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_CONTEXT);
- Object configConnection = ConfigurationUtils.getConfigConnectorConnection(configuration);
- Object configJob = ConfigurationUtils.getConfigConnectorJob(configuration);
+ PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
+ Object fromConfigConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, configuration);
+ Object fromConfigJob = ConfigurationUtils.getConnectorJobConfig(ConnectorType.FROM, configuration);
// Propagate connector schema in every case for now
- // TODO: Change to coditional choosing between HIO and Connector schema
- Schema schema = ConfigurationUtils.getConnectorSchema(configuration);
+ // TODO: Change to coditional choosing between Connector schemas.
+ Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, configuration);
DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);
LOG.info("Executing destroyer class " + destroyer.getClass());
- destroyer.destroy(destroyerContext, configConnection, configJob);
+ destroyer.destroy(destroyerContext, fromConfigConnection, fromConfigJob);
}
private SqoopDestroyerExecutor() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index 6891258..4bd7bce 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
@@ -36,6 +37,7 @@ import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
+import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.utils.ClassUtils;
@@ -61,10 +63,10 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER);
Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
- PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
- Object connectorConnection = ConfigurationUtils.getConfigConnectorConnection(conf);
- Object connectorJob = ConfigurationUtils.getConfigConnectorJob(conf);
- Schema schema = ConfigurationUtils.getConnectorSchema(conf);
+ PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
+ Object connectorConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, conf);
+ Object connectorJob = ConfigurationUtils.getConnectorJobConfig(ConnectorType.FROM, conf);
+ Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf);
long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 92de37e..34fe4f2 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
@@ -34,6 +35,7 @@ import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.submission.counter.SqoopCounters;
import org.apache.sqoop.utils.ClassUtils;
@@ -66,24 +68,13 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit
Object configJob = null;
// Propagate connector schema in every case for now
- // TODO: Change to coditional choosing between HIO and Connector schema
- Schema schema = ConfigurationUtils.getConnectorSchema(conf);
-
- // Executor is in connector space for IMPORT and in framework space for EXPORT
- switch (ConfigurationUtils.getJobType(conf)) {
- case IMPORT:
- subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
- configConnection = ConfigurationUtils.getConfigConnectorConnection(conf);
- configJob = ConfigurationUtils.getConfigConnectorJob(conf);
- break;
- case EXPORT:
- subContext = new PrefixContext(conf, "");
- configConnection = ConfigurationUtils.getConfigFrameworkConnection(conf);
- configJob = ConfigurationUtils.getConfigFrameworkJob(conf);
- break;
- default:
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
- }
+ // TODO: Change to coditional choosing between Connector schemas.
+ Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf);
+
+ // Get configs for extractor
+ subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
+ configConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, conf);
+ configJob = ConfigurationUtils.getConnectorJobConfig(ConnectorType.FROM, conf);
SqoopSplit split = context.getCurrentKey();
ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context), schema);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 7dedee9..0487d29 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
@@ -38,6 +39,7 @@ import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.utils.ClassUtils;
@@ -202,23 +204,13 @@ public class SqoopOutputFormatLoadExecutor {
if (!isTest) {
// Propagate connector schema in every case for now
- // TODO: Change to coditional choosing between HIO and Connector schema
- schema = ConfigurationUtils.getConnectorSchema(conf);
-
- switch (ConfigurationUtils.getJobType(conf)) {
- case EXPORT:
- subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
- configConnection = ConfigurationUtils.getConfigConnectorConnection(conf);
- configJob = ConfigurationUtils.getConfigConnectorJob(conf);
- break;
- case IMPORT:
- subContext = new PrefixContext(conf, "");
- configConnection = ConfigurationUtils.getConfigFrameworkConnection(conf);
- configJob = ConfigurationUtils.getConfigFrameworkJob(conf);
- break;
- default:
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
- }
+ // TODO: Change to coditional choosing between Connector schemas.
+ // @TODO(Abe): Maybe use TO schema?
+ schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf);
+
+ subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
+ configConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.TO, conf);
+ configJob = ConfigurationUtils.getConnectorJobConfig(ConnectorType.TO, conf);
}
// Create loader context
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index 3c870be..a5f9d6c 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.sqoop.job.etl.HdfsExportExtractor;
+//import org.apache.sqoop.job.etl.HdfsExportExtractor;
import org.apache.sqoop.job.etl.HdfsExportPartitioner;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
import org.apache.sqoop.job.etl.Loader;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 5bce3a9..2359a06 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -40,8 +40,8 @@ import javax.sql.DataSource;
import org.apache.log4j.Logger;
import org.apache.commons.lang.StringUtils;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.framework.FrameworkManager;
import org.apache.sqoop.model.MBooleanInput;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnectionForms;
@@ -117,11 +117,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
registerForms(null, null, mf.getConnectionForms().getForms(),
MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
- // Register all jobs
- for (MJobForms jobForms : mf.getAllJobsForms().values()) {
- registerForms(null, jobForms.getType(), jobForms.getForms(),
- MFormType.JOB.name(), baseFormStmt, baseInputStmt);
- }
+ // Register job forms
+ registerForms(null, null, mf.getJobForms().getForms(),
+ MFormType.JOB.name(), baseFormStmt, baseInputStmt);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mf.toString(), ex);
@@ -153,10 +151,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
// Register all jobs
- for (MJobForms jobForms : mc.getAllJobsForms().values()) {
- registerForms(connectorId, jobForms.getType(), jobForms.getForms(),
- MFormType.JOB.name(), baseFormStmt, baseInputStmt);
- }
+ registerForms(connectorId, ConnectorType.FROM, mc.getJobForms(ConnectorType.FROM).getForms(),
+ MFormType.JOB.name(), baseFormStmt, baseInputStmt);
+ registerForms(connectorId, ConnectorType.TO, mc.getJobForms(ConnectorType.TO).getForms(),
+ MFormType.JOB.name(), baseFormStmt, baseInputStmt);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
@@ -513,10 +511,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
// Register all jobs
- for (MJobForms jobForms : mf.getAllJobsForms().values()) {
- registerForms(null, jobForms.getType(), jobForms.getForms(),
- MFormType.JOB.name(), baseFormStmt, baseInputStmt);
- }
+ registerForms(null, null, mf.getJobForms().getForms(),
+ MFormType.JOB.name(), baseFormStmt, baseInputStmt);
// We're using hardcoded value for framework metadata as they are
// represented as NULL in the database.
@@ -544,8 +540,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
inputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT);
List<MForm> connectionForms = new ArrayList<MForm>();
- Map<MJob.Type, List<MForm>> jobForms =
- new HashMap<MJob.Type, List<MForm>>();
+ List<MForm> jobForms = new ArrayList<MForm>();
loadForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
@@ -555,7 +550,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
mf = new MFramework(new MConnectionForms(connectionForms),
- convertToJobList(jobForms), detectFrameworkVersion(conn));
+ new MJobForms(jobForms), detectFrameworkVersion(conn));
// We're using hardcoded value for framework metadata as they are
// represented as NULL in the database.
@@ -931,8 +926,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
stmt = conn.prepareStatement(STMT_INSERT_JOB,
Statement.RETURN_GENERATED_KEYS);
stmt.setString(1, job.getName());
- stmt.setLong(2, job.getConnectionId());
- stmt.setString(3, job.getType().name());
+ stmt.setLong(2, job.getConnectionId(ConnectorType.FROM));
+ stmt.setLong(3, job.getConnectionId(ConnectorType.TO));
stmt.setBoolean(4, job.getEnabled());
stmt.setString(5, job.getCreationUser());
stmt.setTimestamp(6, new Timestamp(job.getCreationDate().getTime()));
@@ -955,12 +950,16 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
createInputValues(STMT_INSERT_JOB_INPUT,
jobId,
- job.getConnectorPart().getForms(),
+ job.getConnectorPart(ConnectorType.FROM).getForms(),
conn);
createInputValues(STMT_INSERT_JOB_INPUT,
jobId,
job.getFrameworkPart().getForms(),
conn);
+ createInputValues(STMT_INSERT_JOB_INPUT,
+ jobId,
+ job.getConnectorPart(ConnectorType.TO).getForms(),
+ conn);
job.setPersistenceId(jobId);
@@ -997,12 +996,12 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
// And reinsert new values
createInputValues(STMT_INSERT_JOB_INPUT,
job.getPersistenceId(),
- job.getConnectorPart().getForms(),
+ job.getConnectorPart(ConnectorType.FROM).getForms(),
conn);
createInputValues(STMT_INSERT_JOB_INPUT,
- job.getPersistenceId(),
- job.getFrameworkPart().getForms(),
- conn);
+ job.getPersistenceId(),
+ job.getFrameworkPart().getForms(),
+ conn);
} catch (SQLException ex) {
logException(ex, job);
@@ -1620,14 +1619,14 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
formFetchStmt.setLong(1, connectorId);
List<MForm> connectionForms = new ArrayList<MForm>();
- Map<MJob.Type, List<MForm>> jobForms =
- new HashMap<MJob.Type, List<MForm>>();
+ Map<ConnectorType, List<MForm>> jobForms = new HashMap<ConnectorType, List<MForm>>();
- loadForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
+ loadConnectorForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
MConnector mc = new MConnector(connectorName, connectorClassName, connectorVersion,
- new MConnectionForms(connectionForms),
- convertToJobList(jobForms));
+ new MConnectionForms(connectionForms),
+ new MJobForms(jobForms.get(ConnectorType.FROM)),
+ new MJobForms(jobForms.get(ConnectorType.TO)));
mc.setPersistenceId(connectorId);
connectors.add(mc);
@@ -1674,13 +1673,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
List<MForm> connectorConnForms = new ArrayList<MForm>();
List<MForm> frameworkConnForms = new ArrayList<MForm>();
+ List<MForm> frameworkJobForms = new ArrayList<MForm>();
+ Map<ConnectorType, List<MForm>> connectorJobForms = new HashMap<ConnectorType, List<MForm>>();
- Map<MJob.Type, List<MForm>> connectorJobForms
- = new HashMap<MJob.Type, List<MForm>>();
- Map<MJob.Type, List<MForm>> frameworkJobForms
- = new HashMap<MJob.Type, List<MForm>>();
-
- loadForms(connectorConnForms, connectorJobForms,
+ loadConnectorForms(connectorConnForms, connectorJobForms,
formConnectorFetchStmt, inputFetchStmt, 2);
loadForms(frameworkConnForms, frameworkJobForms,
formFrameworkFetchStmt, inputFetchStmt, 2);
@@ -1725,20 +1721,19 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
inputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT);
while(rsJob.next()) {
- long connectorId = rsJob.getLong(1);
- long id = rsJob.getLong(2);
- String name = rsJob.getString(3);
- long connectionId = rsJob.getLong(4);
- String stringType = rsJob.getString(5);
- boolean enabled = rsJob.getBoolean(6);
- String createBy = rsJob.getString(7);
- Date creationDate = rsJob.getTimestamp(8);
- String updateBy = rsJob.getString(9);
- Date lastUpdateDate = rsJob.getTimestamp(10);
-
- MJob.Type type = MJob.Type.valueOf(stringType);
-
- formConnectorFetchStmt.setLong(1, connectorId);
+ long fromConnectorId = rsJob.getLong(1);
+ long toConnectorId = rsJob.getLong(2);
+ long id = rsJob.getLong(3);
+ String name = rsJob.getString(4);
+ long fromConnectionId = rsJob.getLong(5);
+ long toConnectionId = rsJob.getLong(6);
+ boolean enabled = rsJob.getBoolean(7);
+ String createBy = rsJob.getString(8);
+ Date creationDate = rsJob.getTimestamp(9);
+ String updateBy = rsJob.getString(10);
+ Date lastUpdateDate = rsJob.getTimestamp(11);
+
+ formConnectorFetchStmt.setLong(1, fromConnectorId);
inputFetchStmt.setLong(1, id);
//inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms
@@ -1746,20 +1741,20 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
List<MForm> connectorConnForms = new ArrayList<MForm>();
List<MForm> frameworkConnForms = new ArrayList<MForm>();
+ List<MForm> frameworkJobForms = new ArrayList<MForm>();
+ Map<ConnectorType, List<MForm>> connectorJobForms = new HashMap<ConnectorType, List<MForm>>();
- Map<MJob.Type, List<MForm>> connectorJobForms
- = new HashMap<MJob.Type, List<MForm>>();
- Map<MJob.Type, List<MForm>> frameworkJobForms
- = new HashMap<MJob.Type, List<MForm>>();
-
- loadForms(connectorConnForms, connectorJobForms,
- formConnectorFetchStmt, inputFetchStmt, 2);
+ loadConnectorForms(connectorConnForms, connectorJobForms,
+ formConnectorFetchStmt, inputFetchStmt, 2);
loadForms(frameworkConnForms, frameworkJobForms,
formFrameworkFetchStmt, inputFetchStmt, 2);
- MJob job = new MJob(connectorId, connectionId, type,
- new MJobForms(type, connectorJobForms.get(type)),
- new MJobForms(type, frameworkJobForms.get(type)));
+ MJob job = new MJob(
+ fromConnectorId, toConnectorId,
+ fromConnectionId, toConnectionId,
+ new MJobForms(connectorJobForms.get(ConnectorType.FROM)),
+ new MJobForms(connectorJobForms.get(ConnectorType.TO)),
+ new MJobForms(frameworkJobForms));
job.setPersistenceId(id);
job.setName(name);
@@ -1773,8 +1768,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
} finally {
closeResultSets(rsJob);
- closeStatements(formConnectorFetchStmt,
- formFrameworkFetchStmt, inputFetchStmt);
+ closeStatements(formConnectorFetchStmt, formFrameworkFetchStmt, inputFetchStmt);
}
return jobs;
@@ -1791,23 +1785,25 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
* @param type
* @param baseFormStmt
* @param baseInputStmt
+ * @return short number of forms registered.
* @throws SQLException
*/
- private void registerForms(Long connectorId, MJob.Type jobType,
+ private short registerForms(Long connectorId, ConnectorType connectorType,
List<MForm> forms, String type, PreparedStatement baseFormStmt,
PreparedStatement baseInputStmt)
throws SQLException {
short formIndex = 0;
+
for (MForm form : forms) {
if(connectorId == null) {
baseFormStmt.setNull(1, Types.BIGINT);
} else {
baseFormStmt.setLong(1, connectorId);
}
- if(jobType == null) {
+ if(connectorType == null) {
baseFormStmt.setNull(2, Types.VARCHAR);
} else {
- baseFormStmt.setString(2, jobType.name());
+ baseFormStmt.setString(2, connectorType.name());
}
baseFormStmt.setString(3, form.getName());
baseFormStmt.setString(4, type);
@@ -1830,6 +1826,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
List<MInput<?>> inputs = form.getInputs();
registerFormInputs(formId, inputs, baseInputStmt);
}
+ return formIndex;
}
/**
@@ -1921,7 +1918,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
* @throws SQLException In case of any failure on Derby side
*/
public void loadForms(List<MForm> connectionForms,
- Map<MJob.Type, List<MForm>> jobForms,
+ List<MForm> jobForms,
PreparedStatement formFetchStmt,
PreparedStatement inputFetchStmt,
int formPosition) throws SQLException {
@@ -2022,20 +2019,15 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
connectionForms.add(mf);
break;
case JOB:
- MJob.Type jobType = MJob.Type.valueOf(operation);
- if (!jobForms.containsKey(jobType)) {
- jobForms.put(jobType, new ArrayList<MForm>());
- }
-
- if (jobForms.get(jobType).size() != formIndex) {
+ if (jobForms.size() != formIndex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
"connector-" + formConnectorId
+ "; form: " + mf
+ "; index: " + formIndex
- + "; expected: " + jobForms.get(jobType).size()
+ + "; expected: " + jobForms.size()
);
}
- jobForms.get(jobType).add(mf);
+ jobForms.add(mf);
break;
default:
throw new SqoopException(DerbyRepoError.DERBYREPO_0007,
@@ -2044,17 +2036,141 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
}
- public List<MJobForms> convertToJobList(Map<MJob.Type, List<MForm>> l) {
- List<MJobForms> ret = new ArrayList<MJobForms>();
+ /**
+ * Load forms and corresponding inputs from Derby database.
+ *
+ * Use given prepared statements to load all forms and corresponding inputs
+ * from Derby.
+ *
+ * @param connectionForms List of connection forms that will be filled up
+ * @param jobForms Map with job forms that will be filled up
+ * @param formFetchStmt Prepared statement for fetching forms
+ * @param inputFetchStmt Prepare statement for fetching inputs
+ * @throws SQLException In case of any failure on Derby side
+ */
+ public void loadConnectorForms(List<MForm> connectionForms,
+ Map<ConnectorType, List<MForm>> jobForms,
+ PreparedStatement formFetchStmt,
+ PreparedStatement inputFetchStmt,
+ int formPosition) throws SQLException {
- for (Map.Entry<MJob.Type, List<MForm>> entry : l.entrySet()) {
- MJob.Type type = entry.getKey();
- List<MForm> forms = entry.getValue();
+ // Get list of structures from database
+ ResultSet rsetForm = formFetchStmt.executeQuery();
+ while (rsetForm.next()) {
+ long formId = rsetForm.getLong(1);
+ Long formConnectorId = rsetForm.getLong(2);
+ String operation = rsetForm.getString(3);
+ String formName = rsetForm.getString(4);
+ String formType = rsetForm.getString(5);
+ int formIndex = rsetForm.getInt(6);
+ List<MInput<?>> formInputs = new ArrayList<MInput<?>>();
- ret.add(new MJobForms(type, forms));
- }
+ MForm mf = new MForm(formName, formInputs);
+ mf.setPersistenceId(formId);
- return ret;
+ inputFetchStmt.setLong(formPosition, formId);
+
+ ResultSet rsetInput = inputFetchStmt.executeQuery();
+ while (rsetInput.next()) {
+ long inputId = rsetInput.getLong(1);
+ String inputName = rsetInput.getString(2);
+ long inputForm = rsetInput.getLong(3);
+ short inputIndex = rsetInput.getShort(4);
+ String inputType = rsetInput.getString(5);
+ boolean inputSensitivity = rsetInput.getBoolean(6);
+ short inputStrLength = rsetInput.getShort(7);
+ String inputEnumValues = rsetInput.getString(8);
+ String value = rsetInput.getString(9);
+
+ MInputType mit = MInputType.valueOf(inputType);
+
+ MInput input = null;
+ switch (mit) {
+ case STRING:
+ input = new MStringInput(inputName, inputSensitivity, inputStrLength);
+ break;
+ case MAP:
+ input = new MMapInput(inputName, inputSensitivity);
+ break;
+ case BOOLEAN:
+ input = new MBooleanInput(inputName, inputSensitivity);
+ break;
+ case INTEGER:
+ input = new MIntegerInput(inputName, inputSensitivity);
+ break;
+ case ENUM:
+ input = new MEnumInput(inputName, inputSensitivity, inputEnumValues.split(","));
+ break;
+ default:
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0006,
+ "input-" + inputName + ":" + inputId + ":"
+ + "form-" + inputForm + ":" + mit.name());
+ }
+
+ // Set persistent ID
+ input.setPersistenceId(inputId);
+
+ // Set value
+ if(value == null) {
+ input.setEmpty();
+ } else {
+ input.restoreFromUrlSafeValueString(value);
+ }
+
+ if (mf.getInputs().size() != inputIndex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0009,
+ "form: " + mf
+ + "; input: " + input
+ + "; index: " + inputIndex
+ + "; expected: " + mf.getInputs().size()
+ );
+ }
+
+ mf.getInputs().add(input);
+ }
+
+ if (mf.getInputs().size() == 0) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0008,
+ "connector-" + formConnectorId
+ + "; form: " + mf
+ );
+ }
+
+ MFormType mft = MFormType.valueOf(formType);
+ switch (mft) {
+ case CONNECTION:
+ if (connectionForms.size() != formIndex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
+ "connector-" + formConnectorId
+ + "; form: " + mf
+ + "; index: " + formIndex
+ + "; expected: " + connectionForms.size()
+ );
+ }
+ connectionForms.add(mf);
+ break;
+ case JOB:
+ ConnectorType type = ConnectorType.valueOf(operation);
+ if (!jobForms.containsKey(type)) {
+ jobForms.put(type, new ArrayList<MForm>());
+ }
+
+ if (jobForms.get(type).size() != formIndex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
+ "connector-" + formConnectorId
+ + "; form: " + mf
+ + "; index: " + formIndex
+ + "; expected: " + jobForms.get(type).size()
+ );
+ }
+
+ jobForms.get(type).add(mf);
+ break;
+ default:
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0007,
+ "connector-" + formConnectorId + ":" + mf);
+ }
+ }
}
private void createInputValues(String query,
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
index fcbb475..1a77360 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
@@ -144,9 +144,9 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQB_NAME = "SQB_NAME";
- public static final String COLUMN_SQB_TYPE = "SQB_TYPE";
+ public static final String COLUMN_SQB_FROM_CONNECTION = "SQB_FROM_CONNECTION";
- public static final String COLUMN_SQB_CONNECTION = "SQB_CONNECTION";
+ public static final String COLUMN_SQB_TO_CONNECTION = "SQB_TO_CONNECTION";
public static final String COLUMN_SQB_CREATION_USER = "SQB_CREATION_USER";
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index 7042a53..e5bb2e7 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -286,13 +286,13 @@ public final class DerbySchemaQuery {
public static final String QUERY_CREATE_TABLE_SQ_JOB =
"CREATE TABLE " + TABLE_SQ_JOB + " ("
+ COLUMN_SQB_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
- + COLUMN_SQB_CONNECTION + " BIGINT, "
+ + COLUMN_SQB_FROM_CONNECTION + " BIGINT, "
+ + COLUMN_SQB_TO_CONNECTION + " BIGINT, "
+ COLUMN_SQB_NAME + " VARCHAR(64), "
- + COLUMN_SQB_TYPE + " VARCHAR(64),"
+ COLUMN_SQB_CREATION_DATE + " TIMESTAMP,"
+ COLUMN_SQB_UPDATE_DATE + " TIMESTAMP,"
+ "CONSTRAINT " + CONSTRAINT_SQB_SQN + " "
- + "FOREIGN KEY(" + COLUMN_SQB_CONNECTION + ") "
+ + "FOREIGN KEY(" + COLUMN_SQB_FROM_CONNECTION + ") "
+ "REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")"
+ ")";
@@ -702,8 +702,8 @@ public final class DerbySchemaQuery {
public static final String STMT_INSERT_JOB =
"INSERT INTO " + TABLE_SQ_JOB + " ("
+ COLUMN_SQB_NAME + ", "
- + COLUMN_SQB_CONNECTION + ", "
- + COLUMN_SQB_TYPE + ", "
+ + COLUMN_SQB_FROM_CONNECTION + ", "
+ + COLUMN_SQB_TO_CONNECTION + ", "
+ COLUMN_SQB_ENABLED + ", "
+ COLUMN_SQB_CREATION_USER + ", "
+ COLUMN_SQB_CREATION_DATE + ", "
@@ -753,43 +753,49 @@ public final class DerbySchemaQuery {
+ " count(*)"
+ " FROM " + TABLE_SQ_JOB
+ " JOIN " + TABLE_SQ_CONNECTION
- + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+ + " ON " + COLUMN_SQB_FROM_CONNECTION + " = " + COLUMN_SQN_ID
+ " WHERE " + COLUMN_SQN_ID + " = ? ";
// DML: Select one specific job
public static final String STMT_SELECT_JOB_SINGLE =
"SELECT "
- + COLUMN_SQN_CONNECTOR + ", "
- + COLUMN_SQB_ID + ", "
- + COLUMN_SQB_NAME + ", "
- + COLUMN_SQB_CONNECTION + ", "
- + COLUMN_SQB_TYPE + ", "
- + COLUMN_SQB_ENABLED + ", "
- + COLUMN_SQB_CREATION_USER + ", "
- + COLUMN_SQB_CREATION_DATE + ", "
- + COLUMN_SQB_UPDATE_USER + ", "
- + COLUMN_SQB_UPDATE_DATE
- + " FROM " + TABLE_SQ_JOB
+ + "FROM_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", "
+ + "TO_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", "
+ + "job." + COLUMN_SQB_ID + ", "
+ + "job." + COLUMN_SQB_NAME + ", "
+ + "job." + COLUMN_SQB_FROM_CONNECTION + ", "
+ + "job." + COLUMN_SQB_TO_CONNECTION + ", "
+ + "job." + COLUMN_SQB_ENABLED + ", "
+ + "job." + COLUMN_SQB_CREATION_USER + ", "
+ + "job." + COLUMN_SQB_CREATION_DATE + ", "
+ + "job." + COLUMN_SQB_UPDATE_USER + ", "
+ + "job." + COLUMN_SQB_UPDATE_DATE
+ + " FROM " + TABLE_SQ_JOB + " AS job"
+ + " LEFT JOIN " + TABLE_SQ_CONNECTION
+ + " as FROM_CONNECTOR ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTOR." + COLUMN_SQN_ID
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
- + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+ + " as TO_CONNECTOR ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTOR." + COLUMN_SQN_ID
+ " WHERE " + COLUMN_SQB_ID + " = ?";
// DML: Select all jobs
public static final String STMT_SELECT_JOB_ALL =
"SELECT "
- + COLUMN_SQN_CONNECTOR + ", "
- + COLUMN_SQB_ID + ", "
- + COLUMN_SQB_NAME + ", "
- + COLUMN_SQB_CONNECTION + ", "
- + COLUMN_SQB_TYPE + ", "
- + COLUMN_SQB_ENABLED + ", "
- + COLUMN_SQB_CREATION_USER + ", "
- + COLUMN_SQB_CREATION_DATE + ", "
- + COLUMN_SQB_UPDATE_USER + ", "
- + COLUMN_SQB_UPDATE_DATE
- + " FROM " + TABLE_SQ_JOB
+ + "FROM_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", "
+ + "TO_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", "
+ + "job." + COLUMN_SQB_ID + ", "
+ + "job." + COLUMN_SQB_NAME + ", "
+ + "job." + COLUMN_SQB_FROM_CONNECTION + ", "
+ + "job." + COLUMN_SQB_TO_CONNECTION + ", "
+ + "job." + COLUMN_SQB_ENABLED + ", "
+ + "job." + COLUMN_SQB_CREATION_USER + ", "
+ + "job." + COLUMN_SQB_CREATION_DATE + ", "
+ + "job." + COLUMN_SQB_UPDATE_USER + ", "
+ + "job." + COLUMN_SQB_UPDATE_DATE
+ + " FROM " + TABLE_SQ_JOB + " AS job"
+ + " LEFT JOIN " + TABLE_SQ_CONNECTION
+ + " as FROM_CONNECTOR ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTOR." + COLUMN_SQN_ID
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
- + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID;
+ + " as TO_CONNECTOR ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTOR." + COLUMN_SQN_ID;
// DML: Select all jobs for a Connector
public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR =
@@ -797,8 +803,8 @@ public final class DerbySchemaQuery {
+ COLUMN_SQN_CONNECTOR + ", "
+ COLUMN_SQB_ID + ", "
+ COLUMN_SQB_NAME + ", "
- + COLUMN_SQB_CONNECTION + ", "
- + COLUMN_SQB_TYPE + ", "
+ + COLUMN_SQB_FROM_CONNECTION + ", "
+ + COLUMN_SQB_TO_CONNECTION + ", "
+ COLUMN_SQB_ENABLED + ", "
+ COLUMN_SQB_CREATION_USER + ", "
+ COLUMN_SQB_CREATION_DATE + ", "
@@ -806,7 +812,7 @@ public final class DerbySchemaQuery {
+ COLUMN_SQB_UPDATE_DATE
+ " FROM " + TABLE_SQ_JOB
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
- + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+ + " ON " + COLUMN_SQB_FROM_CONNECTION + " = " + COLUMN_SQN_ID
+ " AND " + COLUMN_SQN_CONNECTOR + " = ? ";
// DML: Insert new submission
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
index c9c7648..2721846 100644
--- a/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
@@ -24,8 +24,8 @@ import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.framework.FrameworkManager;
import org.apache.sqoop.json.ConnectionBean;
+import org.apache.sqoop.json.ConnectionValidationBean;
import org.apache.sqoop.json.JsonBean;
-import org.apache.sqoop.json.ValidationBean;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnectionForms;
@@ -204,8 +204,8 @@ public class ConnectionRequestHandler implements RequestHandler {
frameworkValidation.getStatus());
// Return back validations in all cases
- ValidationBean outputBean =
- new ValidationBean(connectorValidation, frameworkValidation);
+ ConnectionValidationBean outputBean =
+ new ConnectionValidationBean(connectorValidation, frameworkValidation);
// If we're good enough let's perform the action
if(finalStatus.canProceed()) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
index 362ba79..473bb46 100644
--- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
@@ -19,13 +19,14 @@ package org.apache.sqoop.handler;
import org.apache.log4j.Logger;
import org.apache.sqoop.audit.AuditLoggerManager;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.framework.FrameworkManager;
import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.JobValidationBean;
import org.apache.sqoop.json.JsonBean;
-import org.apache.sqoop.json.ValidationBean;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MJobForms;
@@ -163,47 +164,59 @@ public class JobRequestHandler implements RequestHandler {
MJob job = jobs.get(0);
// Verify that user is not trying to spoof us
- MJobForms connectorForms
- = ConnectorManager.getInstance().getConnectorMetadata(job.getConnectorId())
- .getJobForms(job.getType());
+ MJobForms fromConnectorForms = ConnectorManager.getInstance()
+ .getConnectorMetadata(job.getConnectorId(ConnectorType.FROM))
+ .getJobForms(ConnectorType.FROM);
+ MJobForms toConnectorForms = ConnectorManager.getInstance()
+ .getConnectorMetadata(job.getConnectorId(ConnectorType.TO))
+ .getJobForms(ConnectorType.TO);
MJobForms frameworkForms = FrameworkManager.getInstance().getFramework()
- .getJobForms(job.getType());
+ .getJobForms();
- if(!connectorForms.equals(job.getConnectorPart())
- || !frameworkForms.equals(job.getFrameworkPart())) {
+ if(!fromConnectorForms.equals(job.getConnectorPart(ConnectorType.FROM))
+ || !frameworkForms.equals(job.getFrameworkPart())
+ || !toConnectorForms.equals(job.getConnectorPart(ConnectorType.TO))) {
throw new SqoopException(ServerError.SERVER_0003,
"Detected incorrect form structure");
}
// Responsible connector for this session
- SqoopConnector connector =
- ConnectorManager.getInstance().getConnector(job.getConnectorId());
+ SqoopConnector fromConnector =
+ ConnectorManager.getInstance().getConnector(job.getConnectorId(ConnectorType.FROM));
+ SqoopConnector toConnector =
+ ConnectorManager.getInstance().getConnector(job.getConnectorId(ConnectorType.TO));
// Get validator objects
- Validator connectorValidator = connector.getValidator();
+ Validator fromConnectorValidator = fromConnector.getValidator();
Validator frameworkValidator = FrameworkManager.getInstance().getValidator();
+ Validator toConnectorValidator = toConnector.getValidator();
// We need translate forms to configuration objects
- Object connectorConfig = ClassUtils.instantiate(
- connector.getJobConfigurationClass(job.getType()));
+ Object fromConnectorConfig = ClassUtils.instantiate(
+ fromConnector.getJobConfigurationClass(ConnectorType.FROM));
Object frameworkConfig = ClassUtils.instantiate(
- FrameworkManager.getInstance().getJobConfigurationClass(job.getType()));
+ FrameworkManager.getInstance().getJobConfigurationClass());
+ Object toConnectorConfig = ClassUtils.instantiate(
+ toConnector.getJobConfigurationClass(ConnectorType.TO));
- FormUtils.fromForms(job.getConnectorPart().getForms(), connectorConfig);
+ FormUtils.fromForms(job.getConnectorPart(ConnectorType.FROM).getForms(), fromConnectorConfig);
FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkConfig);
+ FormUtils.fromForms(job.getConnectorPart(ConnectorType.TO).getForms(), toConnectorConfig);
- // Validate both parts
- Validation connectorValidation =
- connectorValidator.validateJob(job.getType(), connectorConfig);
+ // Validate all parts
+ Validation fromConnectorValidation =
+ fromConnectorValidator.validateJob(fromConnectorConfig);
Validation frameworkValidation =
- frameworkValidator.validateJob(job.getType(), frameworkConfig);
+ frameworkValidator.validateJob(frameworkConfig);
+ Validation toConnectorValidation =
+ toConnectorValidator.validateJob(toConnectorConfig);
- Status finalStatus = Status.getWorstStatus(connectorValidation.getStatus(),
- frameworkValidation.getStatus());
+ Status finalStatus = Status.getWorstStatus(fromConnectorValidation.getStatus(),
+ frameworkValidation.getStatus(), toConnectorValidation.getStatus());
// Return back validations in all cases
- ValidationBean outputBean =
- new ValidationBean(connectorValidation, frameworkValidation);
+ JobValidationBean outputBean =
+ new JobValidationBean(fromConnectorValidation, frameworkValidation, toConnectorValidation);
// If we're good enough let's perform the action
if(finalStatus.canProceed()) {
@@ -247,8 +260,9 @@ public class JobRequestHandler implements RequestHandler {
bean = new JobBean(jobs);
// Add associated resources into the bean
+ // @TODO(Abe): From/To.
for( MJob job : jobs) {
- long connectorId = job.getConnectorId();
+ long connectorId = job.getConnectorId(ConnectorType.FROM);
if(!bean.hasConnectorBundle(connectorId)) {
bean.addConnectorBundle(connectorId,
ConnectorManager.getInstance().getResourceBundle(connectorId, locale));
@@ -258,7 +272,8 @@ public class JobRequestHandler implements RequestHandler {
long jid = Long.valueOf(sjid);
MJob job = repository.findJob(jid);
- long connectorId = job.getConnectorId();
+ // @TODO(Abe): From/To
+ long connectorId = job.getConnectorId(ConnectorType.FROM);
bean = new JobBean(job);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java
index f80552c..74c863d 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.shell;
import jline.ConsoleReader;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MPersistableEntity;
import org.apache.sqoop.shell.core.Constants;
@@ -70,8 +71,11 @@ public class CloneJobFunction extends SqoopFunction {
MJob job = client.getJob(jobId);
job.setPersistenceId(MPersistableEntity.PERSISTANCE_ID_DEFAULT);
- ResourceBundle connectorBundle = client.getResourceBundle(job.getConnectorId());
+ ResourceBundle fromConnectorBundle = client.getResourceBundle(
+ job.getConnectorId(ConnectorType.FROM));
ResourceBundle frameworkBundle = client.getFrameworkResourceBundle();
+ ResourceBundle toConnectorBundle = client.getResourceBundle(
+ job.getConnectorId(ConnectorType.TO));
Status status = Status.FINE;
@@ -88,7 +92,7 @@ public class CloneJobFunction extends SqoopFunction {
}
// Fill in data from user
- if(!fillJob(reader, job, connectorBundle, frameworkBundle)) {
+ if(!fillJob(reader, job, fromConnectorBundle, frameworkBundle, toConnectorBundle)) {
return null;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java
index 598adbc..de246cb 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.shell;
import jline.ConsoleReader;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.shell.core.Constants;
import org.apache.sqoop.shell.utils.FormDisplayer;
@@ -43,26 +44,26 @@ public class CreateJobFunction extends SqoopFunction {
public CreateJobFunction() {
this.addOption(OptionBuilder
.withDescription(resourceString(Constants.RES_PROMPT_CONN_ID))
- .withLongOpt(Constants.OPT_XID)
+ .withLongOpt(Constants.OPT_FXID)
.hasArg()
- .create(Constants.OPT_XID_CHAR)
+ .create(Constants.OPT_FXID_CHAR)
);
this.addOption(OptionBuilder
- .withDescription(resourceString(Constants.RES_PROMPT_JOB_TYPE))
- .withLongOpt(Constants.OPT_TYPE)
+ .withDescription(resourceString(Constants.RES_PROMPT_CONN_ID))
+ .withLongOpt(Constants.OPT_TXID)
.hasArg()
- .create(Constants.OPT_TYPE_CHAR)
+ .create(Constants.OPT_TXID_CHAR)
);
}
@Override
public boolean validateArgs(CommandLine line) {
- if (!line.hasOption(Constants.OPT_XID)) {
- printlnResource(Constants.RES_ARGS_XID_MISSING);
+ if (!line.hasOption(Constants.OPT_FXID)) {
+ printlnResource(Constants.RES_ARGS_FXID_MISSING);
return false;
}
- if (!line.hasOption(Constants.OPT_TYPE)) {
- printlnResource(Constants.RES_ARGS_TYPE_MISSING);
+ if (!line.hasOption(Constants.OPT_TXID)) {
+ printlnResource(Constants.RES_ARGS_TXID_MISSING);
return false;
}
return true;
@@ -71,19 +72,23 @@ public class CreateJobFunction extends SqoopFunction {
@Override
@SuppressWarnings("unchecked")
public Object executeFunction(CommandLine line, boolean isInteractive) throws IOException {
- return createJob(getLong(line, Constants.OPT_XID),
- line.getOptionValue(Constants.OPT_TYPE),
+ return createJob(getLong(line, Constants.OPT_FXID),
+ getLong(line, Constants.OPT_TXID),
line.getArgList(),
isInteractive);
}
- private Status createJob(Long connectionId, String type, List<String> args, boolean isInteractive) throws IOException {
- printlnResource(Constants.RES_CREATE_CREATING_JOB, connectionId);
+ private Status createJob(Long fromConnectionId, Long toConnectionId, List<String> args, boolean isInteractive) throws IOException {
+ printlnResource(Constants.RES_CREATE_CREATING_JOB, fromConnectionId, toConnectionId);
ConsoleReader reader = new ConsoleReader();
- MJob job = client.newJob(connectionId, MJob.Type.valueOf(type.toUpperCase()));
+ MJob job = client.newJob(fromConnectionId, toConnectionId);
- ResourceBundle connectorBundle = client.getResourceBundle(job.getConnectorId());
+ // @TODO(Abe): From/To.
+ ResourceBundle fromConnectorBundle = client.getResourceBundle(
+ job.getConnectorId(ConnectorType.FROM));
+ ResourceBundle toConnectorBundle = client.getResourceBundle(
+ job.getConnectorId(ConnectorType.TO));
ResourceBundle frameworkBundle = client.getFrameworkResourceBundle();
Status status = Status.FINE;
@@ -98,7 +103,7 @@ public class CreateJobFunction extends SqoopFunction {
}
// Fill in data from user
- if(!fillJob(reader, job, connectorBundle, frameworkBundle)) {
+ if(!fillJob(reader, job, fromConnectorBundle, frameworkBundle, toConnectorBundle)) {
return null;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java b/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java
index 54d8e9a..c345ada 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java
@@ -40,7 +40,7 @@ public class DeleteConnectionFunction extends SqoopFunction {
@Override
public boolean validateArgs(CommandLine line) {
- if (!line.hasOption(Constants.OPT_XID)) {
+ if (!line.hasOption(Constants.OPT_FXID)) {
printlnResource(Constants.RES_ARGS_XID_MISSING);
return false;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java b/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java
index 6e5c9b5..dfaa90e 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java
@@ -42,9 +42,9 @@ public class ShowConnectionFunction extends SqoopFunction {
.withDescription(resourceString(Constants.RES_SHOW_PROMPT_DISPLAY_ALL_CONNS))
.withLongOpt(Constants.OPT_ALL)
.create(Constants.OPT_ALL_CHAR));
- this.addOption(OptionBuilder.hasArg().withArgName(Constants.OPT_XID)
+ this.addOption(OptionBuilder.hasArg().withArgName(Constants.OPT_FXID)
.withDescription(resourceString(Constants.RES_SHOW_PROMPT_DISPLAY_CONN_XID))
- .withLongOpt(Constants.OPT_XID)
+ .withLongOpt(Constants.OPT_FXID)
.create(Constants.OPT_XID_CHAR));
}
@@ -52,8 +52,8 @@ public class ShowConnectionFunction extends SqoopFunction {
public Object executeFunction(CommandLine line, boolean isInteractive) {
if (line.hasOption(Constants.OPT_ALL)) {
showConnections();
- } else if (line.hasOption(Constants.OPT_XID)) {
- showConnection(getLong(line, Constants.OPT_XID));
+ } else if (line.hasOption(Constants.OPT_FXID)) {
+ showConnection(getLong(line, Constants.OPT_FXID));
} else {
showSummary();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java
index 9a5386c..4618211 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.shell;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.shell.core.Constants;
import org.apache.sqoop.shell.utils.TableDisplayer;
@@ -67,25 +68,27 @@ public class ShowJobFunction extends SqoopFunction {
List<String> header = new LinkedList<String>();
header.add(resourceString(Constants.RES_TABLE_HEADER_ID));
header.add(resourceString(Constants.RES_TABLE_HEADER_NAME));
- header.add(resourceString(Constants.RES_TABLE_HEADER_TYPE));
- header.add(resourceString(Constants.RES_TABLE_HEADER_CONNECTOR));
+ header.add(resourceString(Constants.RES_TABLE_HEADER_FROM_CONNECTOR));
+ header.add(resourceString(Constants.RES_TABLE_HEADER_TO_CONNECTOR));
header.add(resourceString(Constants.RES_TABLE_HEADER_ENABLED));
List<String> ids = new LinkedList<String>();
List<String> names = new LinkedList<String>();
- List<String> types = new LinkedList<String>();
- List<String> connectors = new LinkedList<String>();
+ List<String> fromConnectors = new LinkedList<String>();
+ List<String> toConnectors = new LinkedList<String>();
List<String> availabilities = new LinkedList<String>();
for(MJob job : jobs) {
ids.add(String.valueOf(job.getPersistenceId()));
names.add(job.getName());
- types.add(job.getType().toString());
- connectors.add(String.valueOf(job.getConnectorId()));
+ fromConnectors.add(String.valueOf(
+ job.getConnectorId(ConnectorType.FROM)));
+ toConnectors.add(String.valueOf(
+ job.getConnectorId(ConnectorType.TO)));
availabilities.add(String.valueOf(job.getEnabled()));
}
- TableDisplayer.display(header, ids, names, types, connectors, availabilities);
+ TableDisplayer.display(header, ids, names, fromConnectors, toConnectors, availabilities);
}
private void showJobs() {
@@ -118,13 +121,15 @@ public class ShowJobFunction extends SqoopFunction {
formatter.format(job.getLastUpdateDate())
);
printlnResource(Constants.RES_SHOW_PROMPT_JOB_XID_CID_INFO,
- job.getConnectionId(),
- job.getConnectorId());
+ job.getConnectionId(ConnectorType.FROM),
+ job.getConnectorId(ConnectorType.FROM));
// Display connector part
- displayForms(job.getConnectorPart().getForms(),
- client.getResourceBundle(job.getConnectorId()));
+ displayForms(job.getConnectorPart(ConnectorType.FROM).getForms(),
+ client.getResourceBundle(job.getConnectorId(ConnectorType.FROM)));
displayForms(job.getFrameworkPart().getForms(),
client.getFrameworkResourceBundle());
+ displayForms(job.getConnectorPart(ConnectorType.TO).getForms(),
+ client.getResourceBundle(job.getConnectorId(ConnectorType.TO)));
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java
index b060bb4..fbaf661 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.shell;
import jline.ConsoleReader;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.shell.core.Constants;
import org.apache.sqoop.shell.utils.FormDisplayer;
@@ -70,8 +71,11 @@ public class UpdateJobFunction extends SqoopFunction {
MJob job = client.getJob(jobId);
- ResourceBundle connectorBundle = client.getResourceBundle(job.getConnectorId());
+ ResourceBundle fromConnectorBundle = client.getResourceBundle(
+ job.getConnectorId(ConnectorType.FROM));
ResourceBundle frameworkBundle = client.getFrameworkResourceBundle();
+ ResourceBundle toConnectorBundle = client.getResourceBundle(
+ job.getConnectorId(ConnectorType.TO));
Status status = Status.FINE;
@@ -85,7 +89,7 @@ public class UpdateJobFunction extends SqoopFunction {
}
// Fill in data from user
- if(!fillJob(reader, job, connectorBundle, frameworkBundle)) {
+ if(!fillJob(reader, job, fromConnectorBundle, frameworkBundle, toConnectorBundle)) {
return status;
}